Let's build non-blocking and message-driven services using the following steps:
- Let's first modify consumer of the message or the source of the response transaction, which is the Login Microservice. Add a @Configuration class that will create two queues, namely the request and reply queues. This class will be responsible for creating and binding a new routing key, packt.async, which is used for asynchronous messaging:
@Configuration @EnableWebFlux @EnableRabbit public class RabbitMQConfigAsync { @Autowired private DirectExchange exchange; @Bean public Queue requestQueue() { return new Queue("msg.request"); } @Bean public Queue replyQueue() { return new Queue("msg.reply"); } @Bean public Binding binding(DirectExchange exchange, Queue requestQueue) { return BindingBuilder.bind(requestQueue) .to(exchange).with("packt.async"); } }
- The exchange router will be the same router used in the previous recipe since there must only be one exchange router for the entire application. It is recommended that you have one exchange per application to avoid convoluted and messy bindings that will lead to exceptions or undetected services:
- Now add another @RabbitListener service, which will be executed by the msg.request queue with the exchange router's confirmation:
@Service @RabbitListener(queues = "msg.request") public class RequestAsyncLogins { @Autowired private LogindetailsService logindetailsServiceImpl; @RabbitHandler public LoginDetails process(String content) { Integer id = Integer.parseInt(content); LoginDetails details = logindetailsServiceImpl.findLoginById(id); return details ; } }
- Save all files.
- Now open ch11-ipc-emp, and add another client service that creates AsyncRestTemplate and implements a request service handler that when executed will return a Future<T> task wrapper:
@Service public class SendAsyncLogin { @Autowired private DirectExchange exchange; private AsyncRabbitTemplate asyncRabbitTemplate; public SendAsyncLogin(AsyncRabbitTemplate rabbitTemplate) { this.asyncRabbitTemplate = rabbitTemplate; } public DeferredResult<LoginDetails> send(String content) { final DeferredResult<LoginDetails> response = new DeferredResult<>(); ListenableFuture<LoginDetails> future = asyncRabbitTemplate.convertSendAndReceive( exchange.getName(), "packt.async", content); future.addCallback(new LoginHandlerResponse(response)); return response; } private static class LoginHandlerResponse implements ListenableFutureCallback<LoginDetails> { private DeferredResult<LoginDetails> result; public LoginHandlerResponse ( DeferredResult<LoginDetails> result) { this.result = result; } @Override public void onFailure(Throwable throwable) { result.setResult(new LoginDetails()); } @Override public void onSuccess(LoginDetails response) { result.setResult(response); } } }
The send() method can also return Callable<T> or WebAsyncTask<T>, depending on the asynchronous model the problem needs.
- No new AMQP-related configuration classes must be added to the application. Just add injecting the new service in AmqpController and add a new request handler method invoking the asynchronous request:
@Autowired private SendAsyncLogin sendAsyncLogin; @GetMapping(value="/amqpLoginAsync/{id}", produces = MediaType.APPLICATION_JSON_VALUE) public DeferredResult<LoginDetails> exposeGetLoginAsync(@PathVariable("id") String id) { return sendAsyncLogin.send(id); }
- Save all files. Deploy both the applications. Open a browser and run http://localhost:8095/ch10-emp/amqpLoginAsync/1 to have the login profile of an employee with an ID equivalent to 1.
- Shut down the Login Microservice by running curl -XPOST http://localhost:8094/ch11-ipc-login/appdetails/shutdown -k. Run the asynchronous service again and check the result.