Handling an incoming stream

We used the basic_consume method of a Channel to create a Stream that returns Delivery objects from a queue. Since we want to attach that Stream to the actor, we have to implement StreamHandler for the QueueActor type:

impl<T: QueueHandler> StreamHandler<Delivery, LapinError> for QueueActor<T> {
fn handle(&mut self, item: Delivery, ctx: &mut Context<Self>) {
debug!("Message received!");
let fut = self
.channel
.basic_ack(item.delivery_tag, false)
.map_err(drop);
ctx.spawn(wrap_future(fut));
match self.process_message(item, ctx) {
Ok(pair) => {
if let Some((corr_id, data)) = pair {
self.send_message(corr_id, data, ctx);
}
}
Err(err) => {
warn!("Message processing error: {}", err);
}
}
}
}

Our StreamHandler implementation expects a Delivery instance. RabbitMQ expects that a client will send an acknowledgement when it consumes the delivered message. We do it with the basic_ack method call of a Channel instance stored as a field of QueueActor. This method call returns a Future instance that we will spawn in a Context to send an acknowledgement that the message was received.

RabbitMQ requires a consumer to notify with every message that is processed. If a consumer doesn't do this, then the message is left hanging in the queue. But you can set the no_ack field of the BasicConsumeOptions struct to true and the message will be marked as delivered as soon as the consumer reads it. But if your application fails before the message is processed, you will lose the message. It's only suitable for non-critical messages.

We use the process_message method that we will implement later to process a message using the QueueHandler instance. If this method returns a not None value, we will use it as a response message and send it to an outgoing queue using the send_message method, which we will also implement later in this chapter. But now we will add a message to initiate an outgoing message.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.52.188