The first step in setting up the environment for our big data use case is to establish a Kafka node. Kafka is essentially a first in, first-out (FIFO) queue, so we will use the simplest single node (broker) setup. Kafka organizes data using topics, producers, consumers, and brokers.
Important Kafka terminology:
- A broker is essentially a node
- A producer is a process writing data to the message queue
- A consumer is a process reading data from the message queue
- A topic is the specific queue that we write to and read data from
A Kafka topic is further subdivided into a number of partitions. We can split data from a particular topic into multiple brokers (nodes) both when we write to the topic and also when we read our data at the other end of the queue.
After installing Kafka on our local machine or any cloud provider of our choice (there are excellent tutorials for EC2 to be found just a search away) we can create a topic using this single command:
$ kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xmr-btc
Created topic "xmr-btc".
This will create a new topic called xmr-btc.
$ kafka-topics --delete --zookeeper localhost:2181 --topic xmr-btc
We can then get a list of all topics by issuing the following:
$ kafka-topics --list --zookeeper localhost:2181
xmr-btc
We can then create a command-line producer for our topic, just to test that we can send messages to the queue, like this:
$ kafka-console-producer --broker-list localhost:9092 --topic xmr-btc
Data on every line will be sent as a string encoded message to our topic and we can end the process by sending a SIGINT signal (typically Ctrl + C).
Afterwards, we can view the messages that are waiting in our queue by spinning up a consumer:
$ kafka-console-consumer --zookeeper localhost:2181 --topic xmr-btc --from-beginning
This consumer will read all messages in our xmr-btc topic, starting from the beginning of history. This is useful for our test purposes, but we will change this configuration in real-world applications.
Now have our broker set up, we can use the code at https://github.com/agiamas/mastering-mongodb/tree/master/chapter_9 to start reading (consuming) and writing (producing) messages to the queue. For our purposes, we are using the ruby-kafka gem, developed by Zendesk.
For simplicity, we are using a single class to read from a file stored on disk and write to our Kafka queue.
Our produce method will be used to write messages to Kafka:
def produce
options = { converters: :numeric, headers: true }
CSV.foreach('xmr_btc.csv', options) do |row|
json_line = JSON.generate(row.to_hash)
@kafka.deliver_message(json_line, topic: 'xmr-btc')
end
end
Our consume method will read messages from Kafka:
def consume
consumer = @kafka.consumer(group_id: 'xmr-consumers')
consumer.subscribe('xmr-btc', start_from_beginning: true)
trap('TERM') { consumer.stop }
consumer.each_message(automatically_mark_as_processed: false) do |message|
puts message.value
if valid_json?(message.value)
byebug
MongoExchangeClient.new.insert(message.value)
consumer.mark_message_as_processed(message)
end
end
consumer.stop
end
The next step is to write these messages to MongoDB.
First, we create our collection so that our documents expire after 1 minute. From the mongo shell:
> use exchange_data
> db.xmr_btc.createIndex( { "createdAt": 1 }, { expireAfterSeconds: 60 })
{
"createdCollectionAutomatically" : true,
"numIndexesBefore" : 1,
"numIndexesAfter" : 2,
"ok" : 1
}
This way we create a new database called exchange_data with a new collection called xmr_btc that has auto expiration after 60 seconds. For MongoDB to auto-expire documents, we need to provide a field with a datetime value to compare its value against the current server time. In our case this is the createdAt field.
For our use case, we will use the low-level mongo-ruby-driver. The code for MongoExchangeClient is as follows:
class MongoExchangeClient
def initialize
@collection = Mongo::Client.new([ '127.0.0.1:27017' ], database: :exchange_data).database[:xmr_btc]
end
def insert(document)
document = JSON.parse(document)
document['createdAt'] = Time.now
@collection.insert_one(document)
end
end
This client connects to our local database, sets the createdAt field for the TTL document expiration, and saves the message to our collection.
With this setup, we can write messages to Kafka, read them at the other end of the queue, and write them into our MongoDB collection.