Crate

Add the following imports to the src/lib.rs source file:

pub mod queue_actor;

use actix::{Message, SystemRunner};
use failure::Error;
use futures::Future;
use lapin::channel::{Channel, QueueDeclareOptions};
use lapin::client::{Client, ConnectionOptions};
use lapin::error::Error as LapinError;
use lapin::queue::Queue;
use lapin::types::FieldTable;
use serde_derive::{Deserialize, Serialize};
use tokio::net::TcpStream;

You are familiar with some types. Let's discuss some of the new ones. Client represents a client that connects to RabbitMQ. The Channel type will be created as a result of a connection and is returned by a Client. QueueDeclareOptions is used as a parameter for the queue_declare method call of a Channel. ConnectionOptions is necessary to establish a connection, but we will use default values. Queue represents a queue in RabbitMQ.

We need two queues: one for requests and one for responses. We will specify the destination of messages with the correlation ID. Add the following constants to be used as the names of queues:

pub const REQUESTS: &str = "requests";
pub const RESPONSES: &str = "responses";

To spawn a Client and create a Channel, we will add the spawn_client function, which creates a Client and produces a Channel from it:

pub fn spawn_client(sys: &mut SystemRunner) -> Result<Channel<TcpStream>, Error> {
let addr = "127.0.0.1:5672".parse().unwrap();
let fut = TcpStream::connect(&addr)
.map_err(Error::from)
.and_then(|stream| {
let options = ConnectionOptions::default();
Client::connect(stream, options).from_err::<Error>()
});
let (client, heartbeat) = sys.block_on(fut)?;
actix::spawn(heartbeat.map_err(drop));
let channel = sys.block_on(client.create_channel())?;
Ok(channel)
}

The implementation of the preceding function is simple enough. We create a TcpStream from a constant address with the connect method call. You can make the address parameter configurable if necessary. The connect method returns a Future that we use to create a combinator that maps to a new Client connected to RabbitMQ. We use block_on of SystemRunner to execute that Future immediately. It returns a Client and a Heartbeat instance. The Client instance is used to create an instance of Channel. The Heartbeat instance is a task that pings RabbitMQ with a connection that has to be spawned as a concurrent activity in the event loop. We use actix::spawn to run it, because we don't have the Context of an Actor.

Finally, we call the create_channel method of a Client to create a Channel. But the method returns a Future, which we also execute with the block_on method. Now, we can return the created Channel and implement the ensure_queue method, which expects that Channel instance as a parameter.

The ensure_queue method creates the option to call the queue_declare method, which creates a queue inside RabbitMQ:

pub fn ensure_queue(
chan: &Channel<TcpStream>,
name: &str,
) -> impl Future<Item = Queue, Error = LapinError> {
let opts = QueueDeclareOptions {
auto_delete: true,
..Default::default()
};
let table = FieldTable::new();
chan.queue_declare(name, opts, table)
}

We fill QueueDeclareOptions with default parameters, but set the auto_delete field to true, because we want the created queues to be deleted when an application ends. It's suitable for testing purposes. In this method, we won't execute a Future that is returned by the queue_declare method immediately. We return it as is to enable the calling environment to make a combinator with the returned Queue value.

We have implemented all the necessary parts to create a server and a worker. Now, we need to declare request and response types to use them in a worker and in a server.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.138.178