At the time of this writing, the stable version of Kafka is 0.8.1. The prerequisites for running Kafka is a ZooKeeper ensemble and Java Version 1.6 or above. Kafka comes with a convenience script that can start a single-node ZooKeeper, but it is not recommended to use it in a production environment. We will be using the ZooKeeper cluster we deployed in the Setting up a ZooKeeper cluster section of Chapter 2, Setting Up a Storm Cluster.
We will see both how to set up a single-node Kafka cluster first and how to add two more nodes to it to run a full-fledged three-node Kafka cluster with replication enabled.
The following are the steps to set up a single-node Kafka cluster:
kafka_2.8.0-0.8.1.1.tgz
from http://kafka.apache.org/downloads.html.tar -xvzf kafka_2.8.0-0.8.1.1.tgz cd kafka_2.8.0-0.8.1.1
We will refer to the Kafka installation directory as $KAFKA_HOME
from now onwards.
server.properties
, placed at $KAFKA_HOME/config
:log.dirs=/var/kafka-logs zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
Here, zoo1
, zoo2
, and zoo3
represent the hostnames of the ZooKeeper nodes. The following are the definitions of the important properties in the server.properties
file:
broker.id
: This is a unique integer ID for each broker in a Kafka cluster.port
: This is the port number for a Kafka broker. Its default value is 9092. If you want to run multiple brokers on a single machine, give a unique port to each broker.host.name
: This is the hostname to which the broker should bind and advertise itself.log.dirs
: The name of this property is a bit unfortunate as it represents not the log directory for Kafka, but the directory where Kafka stores its actual data. This can take a single directory or a comma-separated list of directories to store data. Kafka throughput can be increased by attaching multiple physical disks to the broker node and specifying multiple data directories, each lying on a different disk. It is not of much use to specify multiple directories on the same physical disk as all the I/O will still be happening on the same disk.num.partitions
: This represents the default number of partitions for newly created topics. This property can be overridden when creating new topics. A greater number of partitions results in greater parallelism at the cost of a larger number of files. By default, this value is set to 1.log.retention.hours
: Kafka does not delete messages immediately after consumers consume them. It retains them for the number of hours defined by this property so that in case of any issues, the consumers can replay the messages from Kafka. The default value is one week. Alternatively, you can also use the log.retention.minutes
property to specify the retention policy in minutes or the log.retention.bytes
property to specify the retention policy in terms of topic size.zookeeper.connect
: This is the comma-separated list of ZooKeeper nodes in the hostname:port
form../bin/kafka-server-start.sh config/server.properties
The following information is displayed:
[2014-06-28 09:40:21,954] INFO Verifying properties (kafka.utils.VerifiableProperties) [2014-06-28 09:40:22,094] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties) [2014-06-28 09:40:24,190] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2014-06-28 09:40:24,307] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
If you get something similar to the preceding lines on your console, then your Kafka broker is up and running and we can proceed to test it.
First, let's create a verification topic for testing by executing the following command:
./bin/kafka-topics.sh --create --zookeeper zoo1:2181 --partitions 1 --replication-factor 1 --topic verification-topic
We will receive the following output:
creation succeeded!
Now, let's verify that the topic creation was successful by listing all the topics:
./bin/kafka-topics.sh --zookeeper zoo1:2181 --list
We will receive the following output:
verification-topic
Now that the topic is created, let's produce sample messages to Kafka. Kafka comes with a command-line producer that we can use to produce messages as follows:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic verification-topic
Write the following messages on the console:
Message 1 Test Message 2 Message 3
Let's consume these messages by starting a console consumer on a new console window and use the following command:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic verification-topic --from-beginning
The following output is displayed on the console:
Message 1 Test Message 2 Message 3
Now, as you type any message on the producer console, it will automatically be consumed by this consumer and displayed on the command line.
Using Kafka's single-node ZooKeeper instance
If you don't want to use an external ZooKeeper ensemble, you can use the single-node ZooKeeper instance that comes with Kafka for quick and dirty development. To start using it, first modify the zookeeper.properties
file at $KAFKA_HOME/config
to specify the data directory by supplying the following property:
dataDir=/var/zookeeper
Now, you can start the ZooKeeper instance with the following command:
./bin/zookeeper-server-start.sh config/zookeeper.properties
Now that we have a single-node Kafka cluster, let's see how we can set up a multinode Kafka cluster using the following steps:
server.properties
, at $KAFKA_HOME/config
:broker.id=0 port=9092 host.name=kafka1 log.dirs=/var/kafka-logs zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
Make sure that the value of the broker.id
property is unique for each Kafka broker.
./bin/kafka-server-start.sh config/server.properties
./bin/kafka-topics.sh --create --zookeeper zoo1:2181 --partitions 3 --replication-factor 1 --topic verification
We will receive the following output:
creation succeeded!
Now, we will list the topics to see whether the topic was created successfully using the following command:
./bin/kafka-topics.sh --describe --zookeeper zoo1:2181 --topic verification
The following information is displayed:
Topic:verification PartitionCount:3 ReplicationFactor:1 Configs: Topic: verification Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: verification Partition: 1 Leader: 1 Replicas: 0 Isr: 0 Topic: verification Partition: 2 Leader: 2 Replicas: 0 Isr: 0
Now, we will verify the setup by using the Kafka console producer and consumer as done in the previous section using the following command:
./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic verification
Here, kafka1
, kafka2
, and kafka3
are the IP addresses of Kafka brokers. Write the following messages on the console:
First Second Third
Let's consume these messages by starting a new console consumer on a new console window as follows:
./bin/kafka-console-consumer.sh --zookeeper zoo1:2181 --topic verification --from-beginning
We will receive the following output:
First Second Third
So now, we have a working three-broker Kafka cluster. In the next section, we will see how to write a producer that can produce messages to Kafka.
If you don't have multiple machines and you want to test how partitions are distributed among various brokers, then you can run multiple Kafka brokers on a single node. The following are the steps to set up multiple Kafka brokers on a single node:
server.properties
file from the config
folder to create the server1.properties
and server2.properties
files in the config
folder.server.properties
file:broker.id=0 port=9092 log.dirs=/var/kafka-logs zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
server1.properties
file:broker.id=1 port=9093 log.dirs=/var/kafka-1-logs zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
server2.properties
file:broker.id=2 port=9094 log.dirs=/var/kafka-2-logs zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
./bin/kafka-server-start.sh config/server.properties ./bin/kafka-server-start.sh config/server1.properties ./bin/kafka-server-start.sh config/server2.properties
18.118.20.68