Let's implement a direct reply-to communication using the AMQP protocol by following these steps:
- Open the first message consumer, the Login Microservice project. Disable the previous RabbitMQConfig for blocking send-receive communication and replace it with RabbitMQEventConfig as follows. Obviously, the following class only contains the queue and no exchange binding setup:
@Configuration @EnableWebFlux @EnableRabbit public class RabbitMQEventConfig { @Bean public Queue queue() { return new Queue("login.packt.retrieval.msg"); } }
- Then, disable the configuration for the synchronous send-receive and replace it with the following exchange-queue binding configuration:
@Configuration @EnableWebFlux @EnableRabbit public class RabbitMQEventConfigAsync { @Bean public Queue requestQueue() { return new Queue("msg.request"); } @Bean public Queue replyQueue() { return new Queue("msg.reply"); } }
- Since there will be no additional changes for its @RabbitListener services, save all files for deployment later.
- Go to ch11-ipc-emp, the producer of the request message, and replace the old blocking AMQP configuration with the following:
@Configuration @EnableWebFlux @EnableRabbit public class RabbitMQEventConfig { @Bean public Queue queue() { return new Queue("login.packt.retrieval.msg"); } }
- On the asynchronous AMQP send-receive configuration, there will be no changes to be reflected so far.
- For its blocking client-service implementation, replace SendRequestLogin with this @RepositoryEventHandler from Spring Data REST, which was purposely created to publish the request message with the payload content of send() to queue() or requestQueue(). It has an event handler method annotated with @HandleAfterCreate that is triggered to manage the publish once send() is called:
@Service @RepositoryEventHandler public class SendRequestEventLogin { private RabbitTemplate rabbitTemplate; private Queue queue; public SendRequestEventLogin(Queue queue, RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; this.queue = queue; } @HandleAfterCreate public LoginDetails loginHandler(String content) { return send(content); } public LoginDetails send(String content) { System.out.println("send request"); LoginDetails results = (LoginDetails) rabbitTemplate.convertSendAndReceive( queue.getName(), content); return results; } }
- Also, replace the SendAsyncLogin service class with an event handler that filters the parameter content and uses AsyncRestTemplate to establish broker-based communication directly with the queue:
@Service @RepositoryEventHandler public class SendAsyncEventLogin { private AsyncRabbitTemplate asyncRabbitTemplate; private Queue requestQueue; public SendAsyncEventLogin(Queue requestQueue, AsyncRabbitTemplate rabbitTemplate) { this.asyncRabbitTemplate = rabbitTemplate; // rabbitTemplate.setReceiveTimeout(1000); this.requestQueue = requestQueue; } @HandleAfterCreate public DeferredResult<LoginDetails> loginHandler(String content) { return send(content); } public DeferredResult<LoginDetails> send(String content) { System.out.println("send request"); final DeferredResult<LoginDetails> response = new DeferredResult<>(); ListenableFuture<LoginDetails> future = asyncRabbitTemplate.convertSendAndReceive( requestQueue.getName(), content); future.addCallback(new LoginHandlerResponse(response)); return response; } // refer to sources }
- Save all changes in the Employee microservice application.
- Deploy the projects. Run both blocking and asynchronous requests again, and observe how fast their execution is compared to the previous two recipes. No routing keys and exchanges are involved in these processes: