In chapter 10, you put together the Cayambe hybrid, combining the slimmed-down monolith with your new microservices. This chapter simplifies the access of administration data in the Cayambe hybrid by switching it to use data streaming.
First you’ll learn about data streaming and how it can benefit developers and architects alike. Taking those lessons, you’ll develop a data-streaming solution for the Cayambe hybrid from the previous chapter, completing the journey from monolith to hybrid.
Before delving into Apache Kafka, the solution you’re going to use for recording and processing data streams, you need some background on data streaming. Otherwise, what Apache Kafka does and how it works will be completely foreign to you as an Enterprise Java developer.
Data streaming doesn’t just refer to the way Netflix gets its movies to play on all your devices. It also refers to a continuously generated stream of data from potentially thousands of sources; each piece of data, or record, is small in size and is stored in the sequence that it was received. That may seem like a lot of buzzwords, but data streaming is still relatively new, and new ways to use it are always being conceived.
What kinds of data apply to data streaming? In a nutshell, pretty much any type of data could be useful in the context of data streaming. Common examples include measurements from vehicle sensors, real-time share prices from the stock market, and trending topics from social networks and sites.
A common use case for data streaming occurs when you have a lot of data, or records, and you want to analyze it for patterns or trends. It may well be that large amounts of the data can be completely ignored, and only key pieces of data are pertinent. It’s also possible for the same set of data records to have different purposes, dependent on which system might be consuming it! For instance, an e-commerce site that captures a stream of page-visit events can use the same data not only to record the number of pages a user visits before purchasing, but also to analyze the number of views each page is getting across all users. This is the beauty of data streaming: you can solve different problems with the same set of data.
Figure 11.1 illustrates how data is received as a stream.
The data for a particular type is received from potentially many sources, and is added to the end of the stream, or pipe, in the order it was received by the system that’s responsible for recording the data stream. There’s no concept of inserting a particular record at a given point in the stream. Everything is added to the end as it’s received.
Though you have several options for recording and processing data streams, for this chapter you’ll focus on Apache Kafka, so let’s take a look at it now.
To clarify the terminology used throughout this chapter, the terms data streaming, data streams, and streaming all refer to the same thing: the process of streaming data from a source for capture.
Apache Kafka (https://kafka.apache.org/) was originally developed by LinkedIn in 2010 to be the core for its central data pipeline. Currently, the pipeline processes upward of 2 trillion messages a day! In early 2011, Apache Kafka was proposed as an open source project at Apache, and it moved out of the incubation phase in late 2012. In the space of a few years, many enterprises are using Apache Kafka, including Apple, eBay, Netflix, Spotify, and Uber.
What is Apache Kafka? It’s is a distributed streaming platform. What do I mean by distributed here? Apache Kafka can partition data from a single data stream across multiple servers within a cluster. In addition, each partition is replicated across servers for fault tolerance of that data.
There are many ways to configure Apache Kafka, in terms of how it’s distributed and its level of fault tolerance, but those topics are beyond the scope of this chapter. For full details, take a look at the Apache Kafka documentation at https://kafka.apache.org/documentation/.
As a streaming platform, the key capabilities that Apache Kafka provides are as follows:
At its core, Apache Kafka is a distributed commit log: it doesn’t notify sources that a piece of data has been recorded in the stream until it’s committed to the log. Being distributed, as I mentioned before, refers to each commit in the log, or stream, being spread across partitions and replicated.
Another way of describing Apache Kafka is as a database with no clothes: the data is at the forefront and not hidden. Databases, at their core, use a commit log, as Apache Kafka does, to track changes and as a means of recovering from server failures to reconstitute the database. With Apache Kafka, the clothes of the database (tables, indexes, and so forth) have been stripped away, leaving just the commit log. This makes Apache Kafka infinitely more consumable and accessible than regular databases.
Apache Kafka also uses semantics that are familiar to Enterprise Java developers that have integrated with messaging systems. There are producers, which generate records or events that are added to the stream, equivalent to the multiple sources present in figure 11.1 previously. Each stream of records is referred to as a topic, and anything that reads records from the stream is a consumer. Figure 11.2 shows how producers and consumers integrate with Apache Kafka.
In addition, connectors enable databases, or other systems, to be sources of records being sent to Apache Kafka. Finally, stream processors have the ability to stream records from one or more topics, perform some type of transformation on the data, and then output it to one or more different topics.
Now that you understand some of the pieces that make up Apache Kafka, let’s define what a record means. Each record within a stream consists of a key, a value, and a timestamp. The key and value are straightforward in terms of their purpose, but why is a timestamp needed? The timestamp is crucial to Apache Kafka knowing when a record was received (which will become even more critical when we cover partitions).
You also need to be aware of additional concepts about records before continuing. Each record is immutable within the data stream: you can’t edit, modify, or remove a record from the data stream after it’s been added. All you can do is provide update records for the same key that sets a different value.
Figure 11.3 expands on the stream from figure 11.1, showing possible records for a real-time stream of share prices. In figure 11.3, you can see that there’s no single record for the key RHT. There are currently three, all with different values. This is the immutability of the data stream. If the stream weren’t immutable, there likely would be a single record with key RHT that’s continually updated with a new value.
A big advantage with immutable data streams is that you have a history of change for the same key. Certainly in some situations, you might be concerned with only the current value of something. But far more often, knowing the history and being able to determine change over time is critical.
Records are also persisted: the log is retained on the filesystem, allowing the records to be processed at any point in the future. Having said that, limits exist on how long a record is retained. Each record is persisted only so long as is allowed by the retention policy on that particular topic. A topic could be defined to retain records indefinitely, presuming disk space isn’t an issue, or it could be purged after a few days, whether or not it has been consumed by anything.
Topics in Kafka relate to a category, or type, of record that can be published and consumed. For instance, you’d use one topic for real-time share prices and a separate topic for measurements from vehicle sensors.
Each topic is divided into one or more partitions, across one or more servers within the Kafka cluster. A partition is a single logical data stream, or topic, such as those you saw in figures 11.1 and 11.3, that’s split into multiple physical data streams.
Figure 11.4 shows a partition. Partitioning of a topic increases the parallelism that can be achieved when writing or reading from a specific topic. The figure illustrates a single topic that’s split into three partitions. Each partition represents an ordered and immutable sequence of records that’s continually appended to, creating a structured log of change events within a data stream. Each record in a given partition is assigned a sequential ID number known as the offset. The offset uniquely identifies a record within a specific partition.
A critical point with Kafka records that developers need to be especially mindful of is the definition of the key to be associated with a record. If the key isn’t truly unique within the context of the business, there’s the danger of overlap between key and timestamp combinations—especially as Kafka guarantees that all records with the same key are placed on the same partition, ensuring that all records for a key are stored in sequence on a single partition.
Figure 11.5 shows how producers and consumers interact with a topic partition.
As mentioned previously, producers always write new records to the very end of a partition. Consumers typically process records sequentially, but are able to specify at which offset they begin processing. For instance, in figure 11.5, Consumer B may have begun reading from offset 0 and is now processing offset 11. Consumer A is at offset 9 but may have begun reading records from that offset only and not from 0.
Figure 11.5 introduces some concepts regarding consumers that are worth elaborating on, so you’re familiar with what they can do:
Figure 11.6 is a brief reminder of what has been developed and integrated so far with the Cayambe hybrid. The grayed-out piece is to be completed in this chapter and links the Admin and Cayambe databases, via an Apache Kafka topic, to remove the need for the Cayambe database to manage categories directly. This makes it possible to simply feed the data from one database to another.
Without data streaming in figure 11.6, you have a few alternatives:
To properly support the model in figure 11.6, you want to be able to convert the database change events into records in Kafka for you to process. Such a solution has the least impact on the Admin microservice while still enabling you to consume its data. What you need is a connector for Kafka that can do this for you.
Are any tools available that would make that possible? Why, yes, there are! Debezium is an open source project for streaming changes out of databases into Kafka.
Debezium is a distributed platform for change data capture. You can start Debezium, point it at your databases, and react to each insert, update, or delete that’s made on those databases in completely separate applications. Debezium allows you to consume database row-level changes, without any impact or changes to applications that currently perform those database updates directly. A huge benefit with Debezium is that any applications or services consuming the database changes can be taken down for maintenance without losing a single change. Debezium is still recording the changes into Kafka, ready for consumption when the services are available again. Full details on Debezium can be found at http://debezium.io/.
To gain a better understanding of Apache Kafka, data streaming, and how they can be integrated into your microservices, you aren’t going to implement Debezium for the Cayambe hybrid in this chapter. I’ll leave that as an additional exercise for you.
For the Cayambe hybrid, you’re going to directly produce events into Apache Kafka, and then consume them on the other side. Figure 11.7 shows the changes you’re going to make to the architecture, in support of showing more details of the way Apache Kafka works.
You’re adding code to the Admin microservice to produce events that will be sent to an Apache Kafka topic. Then you have a Kafka microservice to consume those events and update the Cayambe database with the changes.
You’re still going to use data streaming to move the data you need from one place to another, though not using something like Debezium will be a bit less efficient in terms of real production use, but beneficial to understand what’s going on.
Before looking at implementing the microservices to integrate with Kafka, let’s get Kafka up and running on OpenShift! If you don’t have Minishift running already, let’s start it now just as you did in chapter 10:
> minishift start --cpus 3 --memory 4GB
After Minishift is up and running, start the OpenShift console and log in. In your existing project, click Add to Project and then click Import YAML/JSON.
Paste into the text box the contents of /chapter11/resources/Kafka_OpenShift.yml, some snippets of which are in this listing.
apiVersion: v1 kind: Template metadata: name: strimzi 1 annotations: openshift.io/display-name: "Apache Kafka (Persistent storage)" description: >- This template installs Apache Zookeeper and Apache Kafka clusters. For more information see https://github.com/strimzi/strimzi tags: "messaging,datastore" iconClass: "fa pficon-topology" template.openshift.io/documentation-url: "https://github.com/strimzi/strimzi" message: "Use 'kafka:9092' as bootstrap server in your application" ... objects: - apiVersion: v1 kind: Service metadata: name: kafka 2 spec: ports: - name: kafka port: 9092 3 targetPort: 9092 protocol: TCP selector: name: kafka type: ClusterIP ... - apiVersion: v1 kind: Service metadata: name: zookeeper 4 spec: ports: - name: clientport port: 2181 5 targetPort: 2181 protocol: TCP selector: name: zookeeper type: ClusterIP ...
Hold on there, what’s ZooKeeper doing there? It wasn’t mentioned before! That’s right, it wasn’t mentioned before. ZooKeeper is an implementation detail because it’s used internally by Kafka as a distributed key/value store. It’s not something you need to interact with. You’re seeing it here because you’re acting as operations staff to set up Kafka for yourself.
/chapter11/resources/Kafka_OpenShift.yml was originally copied from http://mng.bz/RqUn, but was modified to have only a single Kafka broker instead of three. As a result, it doesn’t support topic replication, but your OpenShift instance needs fewer resources to run Kafka!
After you’ve pasted the contents of the modified file into the pop up, click Create and then Continue to see a form where you can specify different default values. For now, leave those as they are and click Create at the bottom of the page. OpenShift will now provision a Kafka cluster with a single broker, which you can see from the main console page under the strimzi application.
It can take a little time to complete the downloading of the necessary Docker images and then start the containers. Don’t be concerned if the Kafka cluster fails initially if ZooKeeper isn’t running yet. Given time, it’ll restart, and everything will be running as expected.
After all the pods are started, open a terminal window and log into the OpenShift client, if you’re not already. You need to retrieve all the OpenShift services to find the URL for ZooKeeper:
> oc get services NAME CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka 172.30.225.60 <none> 9092/TCP 5h kafka-headless None <none> 9092/TCP 5h zookeeper 172.30.93.118 <none> 2181/TCP 5h zookeeper-headless None <none> 2181/TCP 5h
From the list, you can see the ZooKeeper URL is 172.30.93.118. Head back to the OpenShift console and select Applications and then Pods from the menu options. This provides a list of the running pods in OpenShift. With a single broker, there should be only a single kafka-* pod. Click that Pod and then select the Terminal tab, and you should see something similar to figure 11.8.
To use Kafka, you need to create a topic for your records. Let’s do that within the Terminal tab:
./bin/kafka-topics.sh --create --topic category_topic --replication-factor 1 --partitions 1 --zookeeper 172.30.93.118:2181
You use a Kafka script to create a topic named category_topic that has only a single partition and a single replication. You specify only single replication and partition because you have a single broker in the cluster. For instance, if you had three brokers in the cluster, you could use three partitions and a replication factor of 2.
Now that Kafka is running and your topic is created, it’s time to modify the Admin microservice to produce events onto the topic!
To assist in integrating your Enterprise Java code with Kafka, you’ll use a library that converts the pull approach of Kafka into a push approach. This library is still in its infancy but is easy to use because it removes a lot of the boilerplate code that’s required when using the Kafka APIs directly. It’s written as a CDI extension and is available as Maven artifacts for you to consume. The code is available at https://github.com/aerogear/kafka-cdi.
What’s the advantage of converting Kafka’s pull approach into a push one? It’s beneficial for those of us more familiar with Enterprise Java development, where with the CDI programming model we’re able to listen for events and perform an action when we receive one. This is what the Kafka library we’re using brings for us, the ability to listen for events every time a new record is written to a topic, just as if it were a CDI event listener.
The first thing you need to do is update the pom.xml of the Admin microservice to use the new dependency:
<dependency> <groupId>org.aerogear.kafka</groupId> <artifactId>kafka-cdi-extension</artifactId> <version>0.0.10</version> </dependency>
Next you modify CategoryResource to connect with the Kafka topic, and produce records to be appended onto it.
@Path("/") @ApplicationScoped @KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}") 1 public class CategoryResource { ... @Producer private SimpleKafkaProducer<Integer, Category> producer; 2 ... public Response create(Category category) throws Exception { ... producer.send("category_topic", category.getId(), category); 3 ... public Response remove(@PathParam("categoryId") Integer categoryId, @Context SecurityContext context) throws Exception { ... producer.send("category_topic", categoryId, null); 4 ... }
With the changes made to the Admin microservice, you can now deploy it! Before deploying the microservice, you need to have Keycloak running, because your micro-service uses it to secure the delete endpoint. To do that, you need to run this:
/chapter9/keycloak> java -Dswarm.http.port=9090 -jar keycloak-2018.1.0-swarm.jar
If the database files haven’t been removed from the directory, Keycloak should start up and remember all the settings you’ve installed previously. With Keycloak running again, you can now deploy Admin:
/chapter11/admin> mvn clean fabric8:deploy -Popenshift
After the microservice is up and running, you can use the new Administration UI, or via HTTP requests directly with Postman, to update and delete categories. How do you know the Admin microservice is correctly putting records onto the Kafka topic? You don’t have anything consuming those records!
Thankfully, Kafka provides a consumer you can use in a console to see the contents of a topic. In the OpenShift console, you go back to the kafka-* pod, as you had before, and select the Terminal tab. On the command line, run the following:
./bin/kafka-console-consumer.sh --bootstrap-server 172.30.225.60:9092 – from-beginning --topic category_topic
Alternatively, you could connect to the kafka-* pod and run the command remotely:
oc rsh kafka-<identifier> ./bin/kafka-console-consumer.sh --bootstrap-server 172.30.225.60:9092 – from-beginning --topic category_topic
You used the IP address and port of the Kafka service from the list of OpenShift services you retrieved earlier to specify where Kafka is located. Next you tell the script you want to consume all records from the beginning, which is the same as saying from offset 0. Finally, you give it the name of the topic. If all has worked OK, you should see a record appear for each change you made through the Admin microservice.
We’ve covered the producing side of the Kafka topic. Now let’s look at the consuming side.
All the code for the Kafka consumer is in the /chapter11/kafka-consumer/ directory of the book’s code. As with the producer, you add the kafka-cdi-extension dependency to the pom.xml. The remainder of the pom.xml contains the usual Thorntail plugin and dependencies, and the fabric8 Maven plugin for deploying to OpenShift. You also specify a MySQL JDBC driver dependency so you can update the records within the Cayambe database.
For connecting to the Cayambe database, you need to define a DataSource.
swarm: datasources: data-sources: CayambeDS: 1 driver-name: mysql 2 connection-url: jdbc:mysql://mysql:3306/cayambe 3 user-name: cayambe 4 password: cayambe valid-connection-checker-class-name: org.jboss.jca.adapters.jdbc.extensions.mysql.MySQLValidConnectionChecker validate-on-match: true background-validation: false exception-sorter-class-name: org.jboss.jca.adapters.jdbc.extensions.mysql.MySQLExceptionSorter
Finally, you create a class to process the records from the Kafka topic, as you receive them.
@ApplicationScoped @KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}") 1 public class CategoryEventListener { private static final String DATASOURCE = "java:/jboss/datasources/CayambeDS"; 2 @Consumer(topics = "category_topic", keyType = Integer.class, groupId = "cayambe-listener", offset = "earliest") 3 public void handleEvent(Integer key, Category category) { 4 if (null == category) { // Remove Category executeUpdateSQL("delete from category where category_id = " + key); 5 // Remove from Category Hierarchy executeUpdateSQL("delete from category_category where category_id = " + key); 6 executeUpdateSQL("delete from category_category where parent_id = " + key); } else { boolean update = rowExists("select * from category where category_id = " + key); 7 if (update) { // Update Category executeUpdateSQL("update category set name = '" + category.getName()8 + "' header = '" + category.getHeader() + "' image = '" + category.getImagePath() + "' where category_id = " + key); } else { // Create Category 9 executeUpdateSQL("insert into category (id,name,header,visible,image) values(" + key + ",'" + category.getName() + "', '" + category.getHeader() + "', " + (category.isVisible() ? 1 : 0) + ", '" + category.getImagePath() + "')"); executeUpdateSQL("insert into category_category (category_id, parent_id)" + " values (" + category.getId() + "," + category.getParent().getId() + ")"); } } } private void executeUpdateSQL(String sql) { 10 Statement statement = null; Connection conn = null; try { conn = getDatasource().getConnection(); statement = conn.createStatement(); statement.executeUpdate(sql); statement.close(); conn.close(); } catch (Exception e) { ... } } boolean rowExists(String sql) { 11 Statement statement = null; Connection conn = null; ResultSet results = null; try { conn = getDatasource().getConnection(); statement = conn.createStatement(); results = statement.executeQuery(sql); return results.next(); } catch (Exception e) { ... } return false; } private DataSource getDatasource() { 12 if (null == dataSource) { try { dataSource = (DataSource) new InitialContext().lookup(DATASOURCE); } catch (NamingException ne) { ne.printStackTrace(); } } return dataSource; } private DataSource dataSource = null; }
CategoryEventListener registers a method to listen to the Kafka events, by defining the key type, value type, which topic you’re processing, a consumer group, and that you want to process all records in the stream from the beginning. When you receive a Kafka record, you then determine whether you need to remove a category, the value is null, or whether we’re processing a new or updated record.
To distinguish between update and new categories, you execute an SQL statement on the existing categories in Cayambe to see whether this record exists. If it does, it’s an update record; if it doesn’t, it’s a new one.
If you didn’t want the overhead of running an SQL statement to determine whether you’re dealing with an update or a new category, you could change the value type for the records in Kafka to be an enclosing object. The Category instance, the current value, can be a field on a new type, with a flag to indicate the type of change event that’s being dealt with.
Now that you’ve finished developing the Kafka consumer, you’re ready to see it all working in unison! But before you deploy the Kafka consumer you just created, to see the visual changes as they happen, it’s worth starting up the Cayambe hybrid from chapter 10 with the following:
/wildfly-11.0.0.Final/bin/standalone.sh
With Cayambe started, open a browser and navigate around the category tree. You should notice that any changes you made through the Admin microservice aren’t visible, which makes sense because you haven’t activated the process to update the Cayambe database with any changes. So let’s start your Kafka consumer now:
/chapter11/kafka-consumer> mvn clean fabric8:deploy -Popenshift
When the pod becomes operational, it should process all the records that are present on the Kafka topic, because you specified for it to begin at the earliest offset on the topic. You can open the logs of the service and see the console statements that were printed for each record processed.
With the records processed by the Kafka consumer, go back to the Cayambe UI and refresh the page. When navigating through the category tree and finding categories that were changed through the Admin microservice, you’ll notice that they’re now updated or removed based on what you did earlier.
You’ve successfully decoupled the data between the two systems so that one owns the data, the Admin microservice, and the other consumes a copy of it in a read-only manner. As an added benefit, as long as the Kafka producers and consumers are functioning, the data never becomes stale.
As discussed earlier in the chapter, for an additional exercise, try converting the Cayambe hybrid to use Debezium to process database entries directly, instead of by you producing records within the Admin microservice.
This will also provide another benefit over the current solution, as the category hierarchy can be completely reconstructed from the Kafka topic records whenever needed. The hierarchy will contain records for all the initial inserts you did to load the database initially, as well as any insertions, updates, and removals that have occurred since then.
Additional details on developing microservices with Spring Boot can be found in the appendix.
3.144.105.2