Consuming Kafka messages

The part that consumes the messages encapsulates the message hub from the rest of the application. It is integrated by firing CDI events on arriving messages. This certainly is specific to the Kafka API and should be considered as an example solution.

The updating consumer connects to a specific topic via its consumer group. The startup singleton bean ensures the consumer will be initiated at application startup. A container-managed executor service runs the event consumer in its own thread:

@Startup
@Singleton
public class OrderUpdateConsumer {

    private EventConsumer eventConsumer;

    @Resource
    ManagedExecutorService mes;

    @Inject
    Properties kafkaProperties;

    @Inject
    Event<MealEvent> events;

    @PostConstruct
    private void init() {
        String orders = kafkaProperties.getProperty("topic.orders");

        eventConsumer = new EventConsumer(kafkaProperties,
ev -> events.fire(ev), orders); mes.execute(eventConsumer);
} @PreDestroy public void close() { eventConsumer.stop(); } }

The application-specific Kafka properties are exposed via a CDI producer. They contain the corresponding consumer groups.

The event consumer performs the actual consumption:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.function.Consumer;
import static java.util.Arrays.asList;

public class EventConsumer implements Runnable {

    private final KafkaConsumer<String, MealEvent> consumer;
    private final Consumer<MealEvent> eventConsumer;
    private final AtomicBoolean closed = new AtomicBoolean();

    public EventConsumer(Properties kafkaProperties,
Consumer<MealEvent> eventConsumer, String... topics) { this.eventConsumer = eventConsumer; consumer = new KafkaConsumer<>(kafkaProperties); consumer.subscribe(asList(topics)); } @Override public void run() { try { while (!closed.get()) { consume(); } } catch (WakeupException e) { // will wakeup for closing } finally { consumer.close(); } } private void consume() { ConsumerRecords<String, MealEvent> records =
consumer.poll(Long.MAX_VALUE); for (ConsumerRecord<String, MealEvent> record : records) { eventConsumer.accept(record.value()); } consumer.commitSync(); } public void stop() { closed.set(true); consumer.wakeup(); } }

Kafka records that are consumed result in new CDI events. The configured properties use JSON serializers and deserializers, respectively, to map the domain event classes.

Events that are fired via CDI and consumed successfully are committed to Kafka. The CDI events are fired synchronously, to ensure that all processes are finish reliably before committing.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.154.252