Kafka setup

The first step in setting up the environment for our big data use case is to establish a Kafka node. Kafka is essentially a first in, first-out (FIFO) queue, so we will use the simplest single node (broker) setup. Kafka organizes data using topics, producers, consumers, and brokers.

Important Kafka terminology:

  • A broker is essentially a node
  • A producer is a process writing data to the message queue
  • A consumer is a process reading data from the message queue
  • A topic is the specific queue that we write to and read data from

A Kafka topic is further subdivided into a number of partitions. We can split data from a particular topic into multiple brokers (nodes) both when we write to the topic and also when we read our data at the other end of the queue.

After installing Kafka on our local machine or any cloud provider of our choice (there are excellent tutorials for EC2 to be found just a search away) we can create a topic using this single command:

$ kafka-topics  --create --zookeeper localhost:2181 --replication-factor 1  --partitions 1 --topic xmr-btc
Created topic "xmr-btc".

This will create a new topic called xmr-btc.

Deleting the topic is similar to creating by using:
$ kafka-topics --delete --zookeeper localhost:2181 --topic xmr-btc

We can then get a list of all topics by issuing the following:

$ kafka-topics --list --zookeeper localhost:2181
xmr-btc

We can then create a command-line producer for our topic, just to test that we can send messages to the queue, like this:

$ kafka-console-producer --broker-list localhost:9092 --topic xmr-btc

Data on every line will be sent as a string encoded message to our topic and we can end the process by sending a SIGINT signal (typically Ctrl + C).

Afterwards, we can view the messages that are waiting in our queue by spinning up a consumer:

$ kafka-console-consumer --zookeeper localhost:2181 --topic xmr-btc --from-beginning

This consumer will read all messages in our xmr-btc topic, starting from the beginning of history. This is useful for our test purposes, but we will change this configuration in real-world applications.

You will keep seeing zookeeper in addition to kafka mentioned in the commands. Apache Zookeeper comes together with Apache Kafka and it's a centralized service that is used internally by Kafka for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Now have our broker set up, we can use the code at https://github.com/agiamas/mastering-mongodb/tree/master/chapter_9 to start reading (consuming) and writing (producing) messages to the queue. For our purposes, we are using the ruby-kafka gem, developed by Zendesk.

For simplicity, we are using a single class to read from a file stored on disk and write to our Kafka queue.

Our produce method will be used to write messages to Kafka:

def produce
options = { converters: :numeric, headers: true }
CSV.foreach('xmr_btc.csv', options) do |row|
json_line = JSON.generate(row.to_hash)
@kafka.deliver_message(json_line, topic: 'xmr-btc')
end
end

Our consume method will read messages from Kafka:

def consume
consumer = @kafka.consumer(group_id: 'xmr-consumers')
consumer.subscribe('xmr-btc', start_from_beginning: true)
trap('TERM') { consumer.stop }
consumer.each_message(automatically_mark_as_processed: false) do |message|
puts message.value
if valid_json?(message.value)
byebug
MongoExchangeClient.new.insert(message.value)
consumer.mark_message_as_processed(message)
end
end
consumer.stop
end
Notice that we are using the consumer group API feature (added in Kafka 0.9) to get multiple consumers to access a single topic by assigning each partition to a single consumer. In the event of a consumer failure, its partitions will be reallocated to the remaining members of the group.

The next step is to write these messages to MongoDB.

First, we create our collection so that our documents expire after 1 minute. From the mongo shell:

> use exchange_data
> db.xmr_btc.createIndex( { "createdAt": 1 }, { expireAfterSeconds: 60 })
{
"createdCollectionAutomatically" : true,
"numIndexesBefore" : 1,
"numIndexesAfter" : 2,
"ok" : 1
}

This way we create a new database called exchange_data with a new collection called xmr_btc that has auto expiration after 60 seconds. For MongoDB to auto-expire documents, we need to provide a field with a datetime value to compare its value against the current server time. In our case this is the createdAt field.

For our use case, we will use the low-level mongo-ruby-driver. The code for MongoExchangeClient is as follows:

class MongoExchangeClient
def initialize
@collection = Mongo::Client.new([ '127.0.0.1:27017' ], database: :exchange_data).database[:xmr_btc]
end
def insert(document)
document = JSON.parse(document)
document['createdAt'] = Time.now
@collection.insert_one(document)
end
end

This client connects to our local database, sets the createdAt field for the TTL document expiration, and saves the message to our collection.

With this setup, we can write messages to Kafka, read them at the other end of the queue, and write them into our MongoDB collection.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.217.150.123