Exposing the authentication service

Your first move towards rolling out the authentication service is to create the definition of the request and response messages used for the authentication service. Since CCM favors JSON as a wire format, you're using the JSON schema to strictly define the schemas of the login request and response messages and the logout request and response messages (these schemas can be found in Appendix, Message Schemas).

With this done, you can now turn to coding and start by creating a new class unsurprisingly named AuthenticationService. It will take care of the communication with RabbitMQ and the dispatch to the internal classes that are actually performing the authentication. Let's look at the first part of the constructor of this class, where we've highlighted the interesting bits:

private static final String INTERNAL_SERVICES_EXCHANGE = "internal-services";
private static final String AUTHENTICATION_SERVICE_QUEUE = "authentication-service";

public AuthenticationService(final RabbitMqManager rabbitMqManager)
{
    rabbitMqManager.call(new ChannelCallable<Void>()
    {
        @Override
        public String getDescription()
        {
            return "Declaring and binding: " + AUTHENTICATION_SERVICE_QUEUE;
        }

        @Override
        public Void call(final Channel channel) throws IOException
        {
            channel.exchangeDeclare(INTERNAL_SERVICES_EXCHANGE,
                "headers",
                true, // durable
                false, // auto-delete
                null); // arguments

            channel.queueDeclare(AUTHENTICATION_SERVICE_QUEUE,
                false, // durable
                false, // exclusive,
                true, // auto-delete
                null); // arguments
            String routingKey = "";
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-match", "all");
            arguments.put("request_type", "login");
            arguments.put("request_version", "v1");
            channel.queueBind(AUTHENTICATION_SERVICE_QUEUE, INTERNAL_SERVICES_EXCHANGE, routingKey, arguments);

            // other arguments unchanged
            arguments.put("request_type", "logout");
            channel.queueBind(AUTHENTICATION_SERVICE_QUEUE, INTERNAL_SERVICES_EXCHANGE, routingKey, arguments);

            return null;
        }
    });

Let's review what you've just coded. Once again, we put RabbitMqManager to good use, as it encapsulates all the logic to deal with connection and channel management. Observe how the exchange type is set to headers. Also observe how the queue is nondurable and autodeletable. As we've explained earlier, since service interactions are synchronous and short lived, there's no reason to have request messages survive a RabbitMQ restart. The queue is not exclusive because all the Java application servers will consume it simultaneously.

The binding part of this code is where the interesting things happen. A headers exchange is configured via arguments and not a routing key. This is why the routing key is an empty string. The arguments themselves are a map of key values that define the matching rules with the incoming message headers. A specific key (x-match) is used to specify whether any or all of the other key-value pairs should match.

In your case, you want to match the key with the version v1 of the login and logout types of messages. So, you've bound the authentication-service queue to the internal-services exchange twice as follows:

  • Once with x-match=all, request_type=login, request_version=v1
  • Once with x-match=all, request_type=logout, request_version=v1

Now, let's look at how you've created the consumer for the authentication-service queue, as shown in the following code:

rabbitMqManager.createSubscription(AUTHENTICATION_SERVICE_QUEUE, new SubscriptionDeliverHandler()
{
    @Override
    public void handleDelivery(final Channel channel,
                               final Envelope envelope,
                               final BasicProperties requestProperties,
                               final byte[] requestBody)
    {
        try
        {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
        catch (final IOException ioe)
        {
            LOGGER.severe("Failed to acknowledge: "
                          + reflectionToString(envelope, SHORT_PREFIX_STYLE));
        }

        if (isBlank(requestProperties.getReplyTo()))
        {
            LOGGER.warning("Received request without reply-to: "
                           + reflectionToString(envelope, SHORT_PREFIX_STYLE));
            return;
        }

        handleRequest(channel, envelope, requestProperties, requestBody);
    }

You shouldn't be surprised to see the re-use of the subscription mechanism that you created earlier. Since it deals with graceful reconnection, it's the right thing to do. Note how the incoming messages are acknowledged right away. It would make no sense to reject and redeliver a message in the context of a service-oriented request-response interaction. Thus, all messages are acknowledged independently, irrespective of what happens when handling them.

Tip

The RabbitMQ Java SDK contains helper classes to create RPC clients and servers, including those with JSON-serialized messages. Consider using them, but make sure you understand their behavior in case of disconnection and agree with the message semantics they use.

The next method to look at is the one in charge of deserializing and dispatching the JSON messages to the actual methods. Let's take a look at the following code:

private void handleRequest(final Channel channel,
                           final Envelope envelope,
                           final BasicProperties requestProperties,
                           final byte[] requestBody)
{
    try
    {
        final String contentEncoding = requestProperties.getContentEncoding();

        switch (requestProperties.getContentType())
        {
            case LOGIN_REQUEST_V1_CONTENT_TYPE :
            {
                final LoginRequestV1 request = OBJECT_MAPPER.readValue(new String(requestBody,
                    contentEncoding), LoginRequestV1.class);
                final LoginResponseV1 response = login(request);
                final byte[] responseBody = OBJECT_MAPPER.writeValueAsString(response).getBytes(
                    MESSAGE_ENCODING);
                respond(channel, requestProperties, LOGIN_RESPONSE_V1_CONTENT_TYPE, responseBody);
                break;
            }

            case LOGOUT_REQUEST_V1_CONTENT_TYPE :
            {
                final LogoutRequestV1 request = OBJECT_MAPPER.readValue(new String(requestBody,
                    contentEncoding), LogoutRequestV1.class);
                final LogoutResponseV1 response = logout(request);
                final byte[] responseBody = OBJECT_MAPPER.writeValueAsString(response).getBytes(
                    MESSAGE_ENCODING);
                respond(channel, requestProperties, LOGOUT_RESPONSE_V1_CONTENT_TYPE, responseBody);
                break;
            }
            default :
                throw new IllegalArgumentException("Unsupported message type: " + requestProperties.getContentType());
        }
    }
    catch (final Exception e)
    {
        handleException(channel, envelope, requestProperties, e);
    }
}

As you can see, the dispatching mechanism is based on the message content-type property. Looking at this, three questions are certainly going to pop up in your head:

  • Why not use the content-type property in the matching rule of the headers exchange? The answer is that it's just impossible, as matching rules only apply to custom message headers and not to any of the built-in message properties.
  • Why not use the request_type and request_version headers in the switch expression? It's possible, but for this, we would need to concatenate them in a string that would end up being a variation of the content-type.
  • Why not peek in the message content itself to find out its type? If we are using XML instead of JSON, we would use specific namespaces for that matter. JSON doesn't support that notion. One could argue that we could use a $schema property in our JSON payload and switch to it, but at the end of the day, we'd rather tell message types apart without having to parse them.

Observe that we also specifically deal with unsupported message types. The last thing we want is to silently swallow such messages. Instead, we want to make it clear to the developers and the operation team that something is not right with the system. On the other hand, valid request messages are deserialized to an object that gets passed to either the login or logout methods (which we won't detail here as their implementation is not directly relevant to our discussion).

Tip

Be strict and deserialize request and response JSON messages straight to objects with internal services. Different version numbers will allow you to evolve gracefully. Conversely, if you expose public services over AMQP, be lax with the request messages (do not bind them to objects), but stay strict with your response messages (serialize them from objects). This will cut some slack for external users who may lack the discipline or understanding needed when rigorously dealing with schema versions.

The objects returned from these methods are then sent to the reply queue using the respond method, which is listed in the following code with the important lines highlighted:

private void respond(final Channel channel,
                     final BasicProperties requestProperties,
                     final String responseContentType,
                     final byte[] responseBody) throws IOException
{
    final String messageId = UUID.randomUUID().toString();

    final BasicProperties props = new BasicProperties.Builder()
        .contentType(responseContentType)
        .contentEncoding(MESSAGE_ENCODING)
        .messageId(messageId)
        .correlationId(requestProperties.getCorrelationId())
        .deliveryMode(1)
        .build();

    channel.basicPublish("", requestProperties.getReplyTo(), props, responseBody);
}

The notable aspects of the respond method are as follows:

  • The correlation-id of the request message is propagated to the response message. Though this is not required in your case, since you'll be using temporary reply queues, it is a good practice to do so. Moreover, it opens the door to switch to a permanent response queue should performance issues arise with the short-lived queues.
  • The delivery-mode is set to 1, which indicates nonpersistence. Once again, this is because of the transient nature of request-response interactions.
  • The response is published to the default exchange, the name of which is an empty string. The routing key used for the publication is the name of the reply queue, which is stored in the reply-to property for the request message.

Finally, let's take a look at the handleException method, which is called whenever anything goes wrong while handling a request message, whether this is done because the request message can't be deserialized, its type is unknown, or if the actual method being called ends up throwing an exception. This is shown in the following code:

private void handleException(final Channel channel,
                             final Envelope envelope,
                             final BasicProperties requestProperties,
                             final Exception e1)
{
    LOGGER.log(SEVERE, "Failed to handle: " + reflectionToString(envelope, SHORT_PREFIX_STYLE),
        e1);

    try
    {
        final ErrorV1 error = new ErrorV1()
            .withContext( reflectionToString(envelope, SHORT_PREFIX_STYLE))
            .withMessage(e1.getMessage());
        final byte[] responseBody = OBJECT_MAPPER.writeValueAsString(error).getBytes(
            MESSAGE_ENCODING);
        respond(channel, requestProperties, ERROR_V1_CONTENT_TYPE, responseBody);
    }
    catch (final Exception e2)
    {
        LOGGER.log(SEVERE,
            "Failed to respond error for: " + reflectionToString(envelope, SHORT_PREFIX_STYLE),
            e2);
    }
}

Observe how a generic error message is used as the service response when an exception occurs. This is a very simple and powerful way of propagating potential issues back to the caller of a message-oriented service. Since this is an internal service, it is perfectly fine if the error message carries lots of contextual information.

You're done with the implementation of the authentication service. After deploying the preceding code, the internal-services exchange and the authentication-service queue is created. By looking at the bindings of the latter in the management console, you can visually confirm that the correct bindings and headers that match the specified rules are in place, as shown in the following screenshot:

Exposing the authentication service

The routing and matching rules of a queue bound twice to a headers exchange

Now that the service is ready, it's time to write a client that interacts with it.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.77.250