Running the engine

Now that the final version of the Enricher class is coded, we have to compile and execute it.

As we know, the ProcessingEngine class contains the main method to coordinate the reader and writer classes. Now, let's modify the ProcessingEngine.java file on the src/main/java/monedero/ directory and replace Validator with Enricher as in the highlighted code in Listing 3.6:

package monedero;
public class ProcessingEngine {
public static void main(String[] args){
String servers = args[0];
String groupId = args[1];
String sourceTopic = args[2];
String validTopic = args[3];
String invalidTopic = args[4];
Reader reader = new Reader(servers, groupId, sourceTopic);
Enricher enricher = new Enricher(servers, validTopic, invalidTopic);
reader.run(enricher);
}
}
Listing 3.6: ProcessingEngine.java

The processing engine receives the following five arguments from the command line:

  • args[0] servers indicates the host and port of the Kafka broker
  • args[1] groupId indicates that the consumer is part of this Kafka consumer group
  • args[2] input topic indicates the topic where the reader reads from
  • args[3] validTopic indicates the topic where valid messages are sent
  • args[4] invalidTopic indicates the topic where invalid messages are sent

To rebuild the project from the monedero directory, run the following command:

$ gradle jar

If everything is OK, the output should be similar to the following:

...
BUILD SUCCESSFUL in 8s
2 actionable tasks: 2 executed

To run the project, we need four different command-line windows. Figure 3.3 shows the command-line windows arrangement:

Figure 3.3: The four terminal windows to test the processing engine including: message producer, valid message consumer, invalid message consumer, and the processing engine itself
  1. In the first command-line terminal, go to the Kafka installation directory and generate the two necessary topics, as follows:
$ bin/kafka-topics --create --zookeeper localhost:2181 --
replication-factor 1 --
partitions 1 --topic valid-messages
$ bin/kafka-topics --create --zookeeper localhost:2181 --
replication-factor 1 --

partitions 1 --topic invalid-messages

Then, start a console producer to the input-topic topic, as follows:

$ bin/kafka-console-producer --broker-list localhost:9092 --topic 
input-topic

This window is where the input messages are produced (typed).

  1. In the second command-line window, start a command-line consumer listening to the valid-messages topic, as follows:
$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --
from-beginning -
-topic valid-messages

  1. In the third command-line window, start a command-line consumer listening to invalid-messages topic, as follows:
$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --
from-beginning -
-topic invalid-messages
  1. In the fourth command-line terminal, start up the processing engine. From the project root directory (where the gradle jar command were executed) run this command:
$ java -jar ./build/libs/monedero-0.2.0.jar localhost:9092 foo 
input-topic valid-
messages invalid-messages

From the first command-line terminal (the console producer), send the following three messages (remember to type enter between messages and execute each one in just one line):

{"event": "CUSTOMER_CONSULTS_ETHPRICE", "customer": {"id": "14862768", "name": "Snowden, Edward", "ipAddress": "95.31.18.111"}, "currency": {"name": "ethereum", "price": "USD"}, "timestamp": "2018-09-28T09:09:09Z"}
{"event": "CUSTOMER_CONSULTS_ETHPRICE", "customer": {"id": "13548310", "name": "Assange, Julian", "ipAddress": "185.86.151.11"}, "currency": {"name": "ethereum", "price": "USD"}, "timestamp": "2018-09-28T08:08:14Z"}
{"event": "CUSTOMER_CONSULTS_ETHPRICE", "customer": {"id": "15887564", "name": "Mills, Lindsay", "ipAddress": "186.46.129.15"}, "currency": {"name": "ethereum", "price": "USD"}, "timestamp": "2018-09-28T19:51:35Z"}

As these are valid messages, the messages typed in the producer console should appear in the valid-messages consumer console window, as in the example:

{"event": "CUSTOMER_CONSULTS_ETHPRICE", "customer": {"id": "14862768", "name": "Snowden, Edward", "ipAddress": "95.31.18.111", "country":"Russian Federation","city":"Moscow"}, "currency": {"name": "ethereum", "price": "USD", "rate":0.0049}, "timestamp": "2018-09-28T09:09:09Z"}
{"event": "CUSTOMER_CONSULTS_ETHPRICE", "customer": {"id": "13548310", "name": "Assange, Julian", "ipAddress": "185.86.151.11", "country":"United Kingdom","city":"London"}, "currency": {"name": "ethereum", "price": "USD", "rate":0.049}, "timestamp": "2018-09-28T08:08:14Z"}
{"event": "CUSTOMER_CONSULTS_ETHPRICE", "customer": {"id": "15887564", "name": "Mills, Lindsay", "ipAddress": "186.46.129.15", "country":"Ecuador","city":"Quito"}, "currency": {"name": "ethereum", "price": "USD", "rate":0.049}, "timestamp": "2018-09-28T19:51:35Z"}

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.190.207.144