Tracking activity asynchronously with AMQP

In this section, we will implement activity tracking using RabbitMQ (https://www.rabbitmq.com), which is an AMQP implementation. We will need to refactor the domain event publishing and listening mechanism that we implemented earlier and remove the Spring's application event-related code.

At a high level, the new domain event Publisher, AmqpDomainEventPublisher, will send domain events to RabbitMQ's Exchange, which is a fan-out exchange that binds to one or more queues. Once an Exchange receives a message, it will broadcast the message to all the queues it knows, and consumers who listen to those queues will receive the message. Figure 13.7 shows the message flows using AMQP to implement the domain event publishing and consuming:

Figure 13.7: AMQP publisher and consumers

The following shows com.taskagile.config.MessageConfiguration, where we configure the exchange, queue, and binding:

...
@Configuration
public class MessageConfiguration {

@Bean
public FanoutExchange domainEventsExchange() {
return new FanoutExchange("ta.domain.events", true, false);
}

@Bean
public Queue activityTrackingQueue() {
return new Queue("ta.activity.tracking", true);
}

@Bean
public Binding bindingActivityTracking(FanoutExchange exchange, Queue activityTrackingQueue) {
return BindingBuilder.bind(activityTrackingQueue).to(exchange);
}
}

As you can see, in the domainEventsExchange() method, we create a durable FanoutExchange, named "ta.domain.events", and in the activityTrackingQueue() method, we create a durable queue named "ta.activity.tracking", and in the bindingActivityTracking() method, we bind the activity tracking queue to the exchange.

The following is how AmqpDomainEventPublisher looks:

...
@Component
public class AmqpDomainEventPublisher implements DomainEventPublisher {
...
private RabbitTemplate rabbitTemplate;
private FanoutExchange exchange;

public AmqpDomainEventPublisher(RabbitTemplate rabbitTemplate,
@Qualifier("domainEventsExchange") FanoutExchange exchange) {
this.rabbitTemplate = rabbitTemplate;
this.exchange = exchange;
}

@Override
public void publish(DomainEvent event) {
log.debug("Publishing domain event: " + event);
try {
rabbitTemplate.convertAndSend(exchange.getName(), "", event);
} catch (AmqpException e) {
log.error("Failed to send domain event to MQ", e);
}
}
}

In this publisher, we inject a RabbitTemplate instance and the domainEventsExchange through the constructor, and inside the publish() method, we use the convertAndSend() method of the rabbitTemplate to send the domain event as a RabbitMQ message.

Listening to a queue is quite simple with Spring. The following shows ActivityTracker, which listens to the activity tracking queue that we defined in the configuration:

@Component
public class ActivityTracker {
...
private ActivityService activityService;
private DomainEventToActivityConverter
domainEventToActivityConverter;

public ActivityTracker(ActivityService activityService,
DomainEventToActivityConverter
domainEventToActivityConverter) {
this.activityService = activityService;
this.domainEventToActivityConverter =
domainEventToActivityConverter;
}

@RabbitListener(queues = "#{activityTrackingQueue.name}")
public void receive(DomainEvent domainEvent) {
log.debug("Receive domain event: " + domainEvent);

Activity activity =
domainEventToActivityConverter.toActivity(domainEvent);
// Save the activity only when there is an activity
// result from the domain event
if (activity != null) {
activityService.saveActivity(activity);
}
}
}

As you can see, ActivityTracker is a simple Spring bean. The only special part is that the receive() method is annotated with the @RabbitListener annotation. With this annotation, Spring will convert the received RabbitMQ message to a DomainEvent object automatically. DomainEventToActivityConverter is a converter that converts the received domain event into a corresponding activity so that ActivityService can save it.

The following is the commit history of implementing activity tracking:

Figure 13.8: Implementing tracking activity with the AQMP commit
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.123.147