Java PlainProcessor

Now, in the src/main/java/kioto/plain directory, create a file called PlainProcessor.java with the content of Listing 4.9.

The following is the content of Listing 4.9PlainProcessor.java (part 1):

package kioto.plain;
import ...
public final class PlainProcessor {
private Consumer<String, String> consumer;
private Producer<String, String> producer;
public PlainProcessor(String brokers) {
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", brokers);
consumerProps.put("group.id", "healthcheck-processor");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
consumer = new KafkaConsumer<>(consumerProps);
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", brokers);
producerProps.put("key.serializer", StringSerializer.class);
producerProps.put("value.serializer", StringSerializer.class);
producer = new KafkaProducer<>(producerProps);
}

An analysis of the first part of the PlainProcessor class includes the following:

  • In the first part, we declare a consumer, as in Listing 4.8
  • In the second part, we declare a producer, as in Listing 4.6

Before continuing to write code, let's remember the project requirements for the Kioto stream processing engine.

Putting it all together, the specification is to create a stream engine that does the following:

  • Generates messages to a Kafka topic called healthchecks
  • Reads messages from the Kafka topic called healthchecks
  • Calculates the uptime based on the start up time
  • Writes the messages in a Kafka topic called uptimes

This entire process is detailed in Figure 4.1, that is, the Kioto stream processing application:

Figure 4.1: The messages are generated into HealthChecksTopic, then read, and finally the calculated uptimes are written it in the uptimes topic.

Now that we're in the src/main/java/kioto/plain directory, let's complete the PlainProcessor.java file with the content of Listing 4.10.

The following is the content of Listing 4.10PlainProcessor.java (part 2):

 public final void process() {
consumer.subscribe(Collections.singletonList(
Constants.getHealthChecksTopic())); //1
while(true) {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L)); //2
for(Object record : records) { //3
ConsumerRecord it = (ConsumerRecord) record;
String healthCheckJson = (String) it.value();
HealthCheck healthCheck = null;
try {
healthCheck = Constants.getJsonMapper()
.readValue(healthCheckJson, HealthCheck.class); // 4
} catch (IOException e) {
// deal with the exception
}
LocalDate startDateLocal =healthCheck.getLastStartedAt().toInstant() .atZone(ZoneId.systemDefault()).toLocalDate(); //5
int uptime =
Period.between(startDateLocal, LocalDate.now()).getDays(); //6
Future future =
producer.send(new ProducerRecord<>(
Constants.getUptimesTopic(),
healthCheck.getSerialNumber(),
String.valueOf(uptime))); //7
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
// deal with the exception
}
}
}
}
public static void main( String[] args) {
(new PlainProcessor("localhost:9092")).process();
}
}
Listing 4.10: PlainProcessor.java (part 2)

An analysis of the PlainProcessor includes the following:

  • In line //1, the consumer is created and subscribed to the source topic. This is a dynamic assignment of the partitions to our customer and join to the customer group. 
  • In line //2, an infinite loop to consume the records, the pool duration is passed as a parameter to the method pool. The customer waits no longer than one second before return.
  • In line //3, we iterate over the records.
  • In line //4, the JSON string is deserialized to extract the health check object.
  • In line //5, the start time is transformed formatted at the current time zone.
  • In line //6, the uptime is calculated.
  • In line //7, the uptime is written to the uptimes topic, using the serial number as the key and the uptime as value. Both values are written as normal strings.

The moment at which the broker returns records to the client also depends on the fetch.min.bytes value; its default is 1, and is the minimum data amount to wait before the broker is available to the client. Our broker returns as soon as 1 byte of data is available, while waiting a maximum of one second.

The other configuration property is fetch.max.bytes, which defines the amount of data returned at once. With our configuration, the broker will return all of the available records (without exceeding the maximum of 50 MB).

If there are no records available, the broker returns an empty list.

Note that we could reuse the producer that generates the mock data, but it is clearer to use another producer to write uptimes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.193.148