Using database actors

In the previous example in this chapter, we used shared State to provide access to a counter stored as i64, wrapped with RefCell. We reuse this struct, but add a CacheLink field to use connections with a CacheActor to get or set cached values. Add this field:

struct State {
counter: RefCell<i64>,
cache: CacheLink,
}

We derived a Default trait for the State struct before, but now we need a new constructor, because we have to provide a CacheLink instance with the actual address of the caching actor:

impl State {
fn new(cache: CacheLink) -> Self {
Self {
counter: RefCell::default(),
cache,
}
}
}

In most cases, caching works this way—it tries to extract a value from a cache; if it exists and hasn't expired, then the value is returned to a client. If there is no valid value, we need to obtain a new one. After we have taken it, we have to store it in a cache for future use.

In the previous example, we often used a Future instance that receives a Response from another microservice. To simplify our use of caching, let's add the cache method to our State implementation. This method will wrap any provided future with a path and try to extract the cached value. If the value isn't available, it will obtain a new one, and afterwards, it receives the store-copied value to cache, and returns the value to the client. This method wraps the provided Future value with another Future trait implementation. Look at the following implementation:

fn cache<F>(&self, path: &str, fut: F)
-> impl Future<Item = Vec<u8>, Error = Error>
where
F: Future<Item = Vec<u8>, Error = Error> + 'static,
{
let link = self.cache.clone();
let path = path.to_owned();
link.get_value(&path)
.from_err::<Error>()
.and_then(move |opt| {
if let Some(cached) = opt {
debug!("Cached value used");
boxed(future::ok(cached))
} else {
let res = fut.and_then(move |data| {
link.set_value(&path, &data)
.then(move |_| {
debug!("Cache updated");
future::ok::<_, Error>(data)
})
.from_err::<Error>()
});
boxed(res)
}
})
}

The implementation uses the State instance to clone CacheLink. We have to use a cloned link because we have to move it to the closure that uses it to store a new value, should we need to obtain it.

First, we call the get_value method of CacheLink and get a Future that requests a value from the cache. Since the method returns Option, we will use the and_then method to check that the value exists in a cache, and return that value to the client. If the value is expired or not available, we will obtain it by executing the provided Future and use a link to call the set_value method if the new value is returned successfully.

Now, we can use the cache method to cache the list of comments that are returned for the comments handler of the previous example:

fn comments(req: HttpRequest<State>) -> FutureResponse<HttpResponse> {
let fut = get_request("http://127.0.0.1:8003/list");
let fut = req.state().cache("/list", fut)
.map(|data| {
HttpResponse::Ok().body(data)
});
Box::new(fut)
}

First, we create a Future to get a value from another microservice using the get_request method that we have implemented before. After that, we get a reference to State using the state method of the request, and call the cache method by passing the /list path, then create a Future instance to obtain a new value.

We have implemented all of the parts of our database actor. We still need to start a set of caching actors with SyncArbiter, and wrap the returned Addr value with CacheLink:

let addr = SyncArbiter::start(3, || {
CacheActor::new("redis://127.0.0.1:6379/", 10)
});
let cache = CacheLink::new(addr);
server::new(move || {
let state = State::new(cache.clone());
App::with_state(state)
// remains part App building
})

Now, you can build the server. It will return the cached value of the /api/list request every 10 seconds.

The other good benefit of using actors is WebSocket. With this, we can add stateful interaction to our microservices using a state-machine implemented as an actor. Let's look at this in the next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.78.83