Your first move towards rolling out the authentication service is to create the definition of the request and response messages used for the authentication service. Since CCM favors JSON as a wire format, you're using the JSON schema to strictly define the schemas of the login request and response messages and the logout request and response messages (these schemas can be found in Appendix, Message Schemas).
With this done, you can now turn to coding and start by creating a new class unsurprisingly named AuthenticationService
. It will take care of the communication with RabbitMQ and the dispatch to the internal classes that are actually performing the authentication. Let's look at the first part of the constructor of this class, where we've highlighted the interesting bits:
private static final String INTERNAL_SERVICES_EXCHANGE = "internal-services"; private static final String AUTHENTICATION_SERVICE_QUEUE = "authentication-service"; public AuthenticationService(final RabbitMqManager rabbitMqManager) { rabbitMqManager.call(new ChannelCallable<Void>() { @Override public String getDescription() { return "Declaring and binding: " + AUTHENTICATION_SERVICE_QUEUE; } @Override public Void call(final Channel channel) throws IOException { channel.exchangeDeclare(INTERNAL_SERVICES_EXCHANGE, "headers", true, // durable false, // auto-delete null); // arguments channel.queueDeclare(AUTHENTICATION_SERVICE_QUEUE, false, // durable false, // exclusive, true, // auto-delete null); // arguments String routingKey = ""; Map<String, Object> arguments = new HashMap<>(); arguments.put("x-match", "all"); arguments.put("request_type", "login"); arguments.put("request_version", "v1"); channel.queueBind(AUTHENTICATION_SERVICE_QUEUE, INTERNAL_SERVICES_EXCHANGE, routingKey, arguments); // other arguments unchanged arguments.put("request_type", "logout"); channel.queueBind(AUTHENTICATION_SERVICE_QUEUE, INTERNAL_SERVICES_EXCHANGE, routingKey, arguments); return null; } });
Let's review what you've just coded. Once again, we put RabbitMqManager
to good use, as it encapsulates all the logic to deal with connection and channel management. Observe how the exchange type is set to headers
. Also observe how the queue is nondurable and autodeletable. As we've explained earlier, since service interactions are synchronous and short lived, there's no reason to have request messages survive a RabbitMQ restart. The queue is not exclusive because all the Java application servers will consume it simultaneously.
The binding part of this code is where the interesting things happen. A headers exchange is configured via arguments and not a routing key. This is why the routing key is an empty string. The arguments themselves are a map of key values that define the matching rules with the incoming message headers. A specific key (x-match
) is used to specify whether any or all of the other key-value pairs should match.
In your case, you want to match the key with the version v1
of the login
and logout
types of messages. So, you've bound the authentication-service
queue to the internal-services
exchange twice as follows:
x-match=all
, request_type=login
, request_version=v1
x-match=all
, request_type=logout
, request_version=v1
Now, let's look at how you've created the consumer for the authentication-service
queue, as shown in the following code:
rabbitMqManager.createSubscription(AUTHENTICATION_SERVICE_QUEUE, new SubscriptionDeliverHandler() { @Override public void handleDelivery(final Channel channel, final Envelope envelope, final BasicProperties requestProperties, final byte[] requestBody) { try { channel.basicAck(envelope.getDeliveryTag(), false); } catch (final IOException ioe) { LOGGER.severe("Failed to acknowledge: " + reflectionToString(envelope, SHORT_PREFIX_STYLE)); } if (isBlank(requestProperties.getReplyTo())) { LOGGER.warning("Received request without reply-to: " + reflectionToString(envelope, SHORT_PREFIX_STYLE)); return; } handleRequest(channel, envelope, requestProperties, requestBody); }
You shouldn't be surprised to see the re-use of the subscription mechanism that you created earlier. Since it deals with graceful reconnection, it's the right thing to do. Note how the incoming messages are acknowledged right away. It would make no sense to reject and redeliver a message in the context of a service-oriented request-response interaction. Thus, all messages are acknowledged independently, irrespective of what happens when handling them.
The next method to look at is the one in charge of deserializing and dispatching the JSON messages to the actual methods. Let's take a look at the following code:
private void handleRequest(final Channel channel, final Envelope envelope, final BasicProperties requestProperties, final byte[] requestBody) { try { final String contentEncoding = requestProperties.getContentEncoding(); switch (requestProperties.getContentType()) { case LOGIN_REQUEST_V1_CONTENT_TYPE : { final LoginRequestV1 request = OBJECT_MAPPER.readValue(new String(requestBody, contentEncoding), LoginRequestV1.class); final LoginResponseV1 response = login(request); final byte[] responseBody = OBJECT_MAPPER.writeValueAsString(response).getBytes( MESSAGE_ENCODING); respond(channel, requestProperties, LOGIN_RESPONSE_V1_CONTENT_TYPE, responseBody); break; } case LOGOUT_REQUEST_V1_CONTENT_TYPE : { final LogoutRequestV1 request = OBJECT_MAPPER.readValue(new String(requestBody, contentEncoding), LogoutRequestV1.class); final LogoutResponseV1 response = logout(request); final byte[] responseBody = OBJECT_MAPPER.writeValueAsString(response).getBytes( MESSAGE_ENCODING); respond(channel, requestProperties, LOGOUT_RESPONSE_V1_CONTENT_TYPE, responseBody); break; } default : throw new IllegalArgumentException("Unsupported message type: " + requestProperties.getContentType()); } } catch (final Exception e) { handleException(channel, envelope, requestProperties, e); } }
As you can see, the dispatching mechanism is based on the message content-type property. Looking at this, three questions are certainly going to pop up in your head:
request_type
and request_version
headers in the switch expression? It's possible, but for this, we would need to concatenate them in a string that would end up being a variation of the content-type.$schema
property in our JSON payload and switch to it, but at the end of the day, we'd rather tell message types apart without having to parse them.Observe that we also specifically deal with unsupported message types. The last thing we want is to silently swallow such messages. Instead, we want to make it clear to the developers and the operation team that something is not right with the system. On the other hand, valid request messages are deserialized to an object that gets passed to either the login
or logout
methods (which we won't detail here as their implementation is not directly relevant to our discussion).
Be strict and deserialize request and response JSON messages straight to objects with internal services. Different version numbers will allow you to evolve gracefully. Conversely, if you expose public services over AMQP, be lax with the request messages (do not bind them to objects), but stay strict with your response messages (serialize them from objects). This will cut some slack for external users who may lack the discipline or understanding needed when rigorously dealing with schema versions.
The objects returned from these methods are then sent to the reply queue using the respond
method, which is listed in the following code with the important lines highlighted:
private void respond(final Channel channel, final BasicProperties requestProperties, final String responseContentType, final byte[] responseBody) throws IOException { final String messageId = UUID.randomUUID().toString(); final BasicProperties props = new BasicProperties.Builder() .contentType(responseContentType) .contentEncoding(MESSAGE_ENCODING) .messageId(messageId) .correlationId(requestProperties.getCorrelationId()) .deliveryMode(1) .build(); channel.basicPublish("", requestProperties.getReplyTo(), props, responseBody); }
The notable aspects of the respond
method are as follows:
1
, which indicates nonpersistence. Once again, this is because of the transient nature of request-response interactions.Finally, let's take a look at the
handleException
method, which is called whenever anything goes wrong while handling a request message, whether this is done because the request message can't be deserialized, its type is unknown, or if the actual method being called ends up throwing an exception. This is shown in the following code:
private void handleException(final Channel channel, final Envelope envelope, final BasicProperties requestProperties, final Exception e1) { LOGGER.log(SEVERE, "Failed to handle: " + reflectionToString(envelope, SHORT_PREFIX_STYLE), e1); try { final ErrorV1 error = new ErrorV1() .withContext( reflectionToString(envelope, SHORT_PREFIX_STYLE)) .withMessage(e1.getMessage()); final byte[] responseBody = OBJECT_MAPPER.writeValueAsString(error).getBytes( MESSAGE_ENCODING); respond(channel, requestProperties, ERROR_V1_CONTENT_TYPE, responseBody); } catch (final Exception e2) { LOGGER.log(SEVERE, "Failed to respond error for: " + reflectionToString(envelope, SHORT_PREFIX_STYLE), e2); } }
Observe how a generic error message is used as the service response when an exception occurs. This is a very simple and powerful way of propagating potential issues back to the caller of a message-oriented service. Since this is an internal service, it is perfectly fine if the error message carries lots of contextual information.
You're done with the implementation of the authentication service. After deploying the preceding code, the internal-services
exchange and the authentication-service
queue is created. By looking at the bindings of the latter in the management console, you can visually confirm that the correct bindings and headers that match the specified rules are in place, as shown in the following screenshot:
Now that the service is ready, it's time to write a client that interacts with it.
3.147.77.250