We used the basic_consume method of a Channel to create a Stream that returns Delivery objects from a queue. Since we want to attach that Stream to the actor, we have to implement StreamHandler for the QueueActor type:
impl<T: QueueHandler> StreamHandler<Delivery, LapinError> for QueueActor<T> {
fn handle(&mut self, item: Delivery, ctx: &mut Context<Self>) {
debug!("Message received!");
let fut = self
.channel
.basic_ack(item.delivery_tag, false)
.map_err(drop);
ctx.spawn(wrap_future(fut));
match self.process_message(item, ctx) {
Ok(pair) => {
if let Some((corr_id, data)) = pair {
self.send_message(corr_id, data, ctx);
}
}
Err(err) => {
warn!("Message processing error: {}", err);
}
}
}
}
Our StreamHandler implementation expects a Delivery instance. RabbitMQ expects that a client will send an acknowledgement when it consumes the delivered message. We do it with the basic_ack method call of a Channel instance stored as a field of QueueActor. This method call returns a Future instance that we will spawn in a Context to send an acknowledgement that the message was received.
We use the process_message method that we will implement later to process a message using the QueueHandler instance. If this method returns a not None value, we will use it as a response message and send it to an outgoing queue using the send_message method, which we will also implement later in this chapter. But now we will add a message to initiate an outgoing message.