Request-reply communication

The following diagram provides an overview of the scenario that we will implement:

Request-reply communication

The sender will send a message to the default exchange with a routing key that matches the name of the designated request queue. The request receiver is a subscriber to the request queue. After a request message is received, the request receiver retrieves the value of the replyTo property from the message header, creates a response message, and sends it to the default exchange with a routing key that matches the replyTo property. This means that the replyTo property points to a queue that handles response messages and the sender is subscribed to that queue in order to receive a response.

Let's extend our Sender class with the following sendRequest() method, which sends a message to the request_exchange exchange, and the receiveResponse() method, which receives a message from the response_queue queue as follows:

private static final String REQUEST_QUEUE = "request_queue";
private static final String RESPONSE_QUEUE = "response_queue";
public void sendRequest(String requestQueue, String message, String correlationId) {
    try {
        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        channel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
        AMQP.BasicProperties amqpProps = new AMQP.BasicProperties();
        amqpProps = amqpProps.builder()
            .correlationId(String.valueOf(correlationId))
            .replyTo(RESPONSE_QUEUE).build();
            channel.basicPublish(DEFAULT_EXCHANGE,                                 REQUEST_QUEUE, amqpProps,                            message.getBytes());
    } catch (IOException e) {
        LOGGER.error(e.getMessage(), e);
    }
}

public String waitForResponse(final String correlationId) {
    QueueingConsumer consumer = new QueueingConsumer(channel);
    String result = null;

    try {
        channel.basicConsume(RESPONSE_QUEUE, true, consumer);
        QueueingConsumer.Delivery delivery = consumer.nextDelivery(3000);
        String message = new String(delivery.getBody());
        if (delivery.getProperties() != null) {
            String msgCorrelationId = delivery.getProperties()
                .getCorrelationId();
            if (!correlationId.equals(msgCorrelationId)) {
            LOGGER.warn("Received response of another request.");
            } else {
                result = message;
            }
        }
                                                    LOGGER.info("Message received: " + message);

    } catch (IOException e) {
        LOGGER.error(e.getMessage(), e);
    } catch (ShutdownSignalException e) {
        LOGGER.error(e.getMessage(), e);
    } catch (ConsumerCancelledException e) {
        LOGGER.error(e.getMessage(), e);
    } catch (InterruptedException e) {
        LOGGER.error(e.getMessage(), e);
    }
    return result;
}

The sendRequest() method crafts an AMQP.BasicProperties instance and provides the replyTo and correlationId properties. The correlationId must be a unique identifier that is passed back in the response message and can be used by the sender to determine the request for which a response is received.

The RequestReceiver class provides a sample implementation of a request receiver:

public class RequestReceiver {

    private static final String DEFAULT_QUEUE = "";
    private static final String REQUEST_QUEUE = "request_queue";
    private final static Logger LOGGER =     
      LoggerFactory.getLogger(Sender.class);
    private Connection connection = null;
    private Channel channel = null;

    public void initialize() {
        try {
            ConnectionFactory factory =                                 new ConnectionFactory();
            factory.setHost("localhost");
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
    . . .
}

The receive() method is used to read a request message from a queue:

    public void receive() {

        if (channel == null) {
            initialize();
        }

        String message = null;
        try {
            channel.queueDeclare(REQUEST_QUEUE, false,                     false, false, null);
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(REQUEST_QUEUE, true, consumer);
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            message = new String(delivery.getBody());
            LOGGER.info("Request received: " + message);

            // do something with the request message ...

            BasicProperties properties = delivery.getProperties();
            if (properties != null) {
                AMQP.BasicProperties amqpProps = new AMQP.BasicProperties();
                amqpProps = amqpProps.builder().correlationId(

String.valueOf(properties.getCorrelationId())).build();

                channel.basicPublish(DEFAULT_QUEUE,                         properties.getReplyTo(), amqpProps, "Response message.".getBytes());
            } else {
                LOGGER.warn("Cannot determine response                         destination for message.");
            }
                
        } catch (IOException e) {
            LOGGER.error(e.getMessage(), e);
        } catch (ShutdownSignalException e) {
            LOGGER.error(e.getMessage(), e);
        } catch (ConsumerCancelledException e) {
            LOGGER.error(e.getMessage(), e);
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }

And again we have a destroy() method – it is important to make sure that you close your connections to the broker if you are no longer using them:

    public void destroy() {
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                LOGGER.warn(e.getMessage(), e);
            }
        }
    }
}

In order to send a request message we can use the RequestSenderDemo class:

public class RequestSenderDemo {

    private static final String REQUEST_QUEUE =                 "request_queue";
    
    public static String sendToRequestReplyQueue() {
        Sender sender = new Sender();
        sender.initialize();
        sender.sendRequest(REQUEST_QUEUE, "Test message.", "MSG1");
        String result = sender.waitForResponse("MSG1");
        sender.destroy();
        return result;
    }
    public static void main(String[] args) {
        sendToRequestReplyQueue();
    }    
}

In order to receive the request message and send a response message, you can use the RequestReceiverDemo class:

public class RequestReceiverDemo {
    
    public static void main(String[] args) throws InterruptedException {
        final RequestReceiver receiver = new RequestReceiver();
        receiver.initialize();
        receiver.receive();
        receiver.destroy();
    }
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.123.106