Setting up Kafka

At the time of this writing, the stable version of Kafka is 0.8.1. The prerequisites for running Kafka is a ZooKeeper ensemble and Java Version 1.6 or above. Kafka comes with a convenience script that can start a single-node ZooKeeper, but it is not recommended to use it in a production environment. We will be using the ZooKeeper cluster we deployed in the Setting up a ZooKeeper cluster section of Chapter 2, Setting Up a Storm Cluster.

We will see both how to set up a single-node Kafka cluster first and how to add two more nodes to it to run a full-fledged three-node Kafka cluster with replication enabled.

Setting up a single-node Kafka cluster

The following are the steps to set up a single-node Kafka cluster:

  1. Download the Kafka 0.8.1.1 binary distribution named kafka_2.8.0-0.8.1.1.tgz from http://kafka.apache.org/downloads.html.
  2. Extract the archive to where you want to install Kafka with the following command:
    tar -xvzf kafka_2.8.0-0.8.1.1.tgz
    cd kafka_2.8.0-0.8.1.1
    

    We will refer to the Kafka installation directory as $KAFKA_HOME from now onwards.

  3. Change the following properties in the Kafka server properties file, server.properties, placed at $KAFKA_HOME/config:
    log.dirs=/var/kafka-logs
    zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

    Here, zoo1, zoo2, and zoo3 represent the hostnames of the ZooKeeper nodes. The following are the definitions of the important properties in the server.properties file:

    • broker.id: This is a unique integer ID for each broker in a Kafka cluster.
    • port: This is the port number for a Kafka broker. Its default value is 9092. If you want to run multiple brokers on a single machine, give a unique port to each broker.
    • host.name: This is the hostname to which the broker should bind and advertise itself.
    • log.dirs: The name of this property is a bit unfortunate as it represents not the log directory for Kafka, but the directory where Kafka stores its actual data. This can take a single directory or a comma-separated list of directories to store data. Kafka throughput can be increased by attaching multiple physical disks to the broker node and specifying multiple data directories, each lying on a different disk. It is not of much use to specify multiple directories on the same physical disk as all the I/O will still be happening on the same disk.
    • num.partitions: This represents the default number of partitions for newly created topics. This property can be overridden when creating new topics. A greater number of partitions results in greater parallelism at the cost of a larger number of files. By default, this value is set to 1.
    • log.retention.hours: Kafka does not delete messages immediately after consumers consume them. It retains them for the number of hours defined by this property so that in case of any issues, the consumers can replay the messages from Kafka. The default value is one week. Alternatively, you can also use the log.retention.minutes property to specify the retention policy in minutes or the log.retention.bytes property to specify the retention policy in terms of topic size.
    • zookeeper.connect: This is the comma-separated list of ZooKeeper nodes in the hostname:port form.
  4. Start the Kafka server by running the following command:
    ./bin/kafka-server-start.sh config/server.properties
    

    The following information is displayed:

    [2014-06-28 09:40:21,954] INFO Verifying properties (kafka.utils.VerifiableProperties)
    [2014-06-28 09:40:22,094] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
    [2014-06-28 09:40:24,190] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
    [2014-06-28 09:40:24,307] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
    

    If you get something similar to the preceding lines on your console, then your Kafka broker is up and running and we can proceed to test it.

  5. Now, we will verify that the Kafka broker has been set up correctly by sending and receiving a test message.

    First, let's create a verification topic for testing by executing the following command:

    ./bin/kafka-topics.sh --create --zookeeper zoo1:2181 --partitions 1 --replication-factor 1 --topic verification-topic
    

    We will receive the following output:

    creation succeeded!
    

    Now, let's verify that the topic creation was successful by listing all the topics:

    ./bin/kafka-topics.sh  --zookeeper zoo1:2181 --list
    

    We will receive the following output:

    verification-topic
    

    Now that the topic is created, let's produce sample messages to Kafka. Kafka comes with a command-line producer that we can use to produce messages as follows:

    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic verification-topic
    

    Write the following messages on the console:

    Message 1
    Test Message 2
    Message 3
    

    Let's consume these messages by starting a console consumer on a new console window and use the following command:

    ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic verification-topic --from-beginning
    

    The following output is displayed on the console:

    Message 1
    Test Message 2
    Message 3
    

    Now, as you type any message on the producer console, it will automatically be consumed by this consumer and displayed on the command line.

Note

Using Kafka's single-node ZooKeeper instance

If you don't want to use an external ZooKeeper ensemble, you can use the single-node ZooKeeper instance that comes with Kafka for quick and dirty development. To start using it, first modify the zookeeper.properties file at $KAFKA_HOME/config to specify the data directory by supplying the following property:

dataDir=/var/zookeeper

Now, you can start the ZooKeeper instance with the following command:

./bin/zookeeper-server-start.sh config/zookeeper.properties

Setting up a three-node Kafka cluster

Now that we have a single-node Kafka cluster, let's see how we can set up a multinode Kafka cluster using the following steps:

  1. Download and unzip Kafka on the three nodes, following steps 1 and 2 of the previous section.
  2. Change the following properties in the Kafka server properties file, server.properties, at $KAFKA_HOME/config:
    broker.id=0
    port=9092
    host.name=kafka1
    log.dirs=/var/kafka-logs
    zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

    Make sure that the value of the broker.id property is unique for each Kafka broker.

  3. Start the Kafka brokers on the nodes by executing the following command on the three nodes:
    ./bin/kafka-server-start.sh config/server.properties
    
  4. Now, let's verify the setup. First, we create a topic with the following command:
    ./bin/kafka-topics.sh --create --zookeeper zoo1:2181 --partitions 3 --replication-factor 1 --topic verification
    

    We will receive the following output:

    creation succeeded!
    

    Now, we will list the topics to see whether the topic was created successfully using the following command:

    ./bin/kafka-topics.sh --describe --zookeeper zoo1:2181 --topic verification
    

    The following information is displayed:

    Topic:verification PartitionCount:3  ReplicationFactor:1  Configs:
    Topic: verification Partition: 0  Leader: 0  Replicas: 0  Isr: 0
    Topic: verification Partition: 1  Leader: 1  Replicas: 0  Isr: 0
    Topic: verification Partition: 2  Leader: 2  Replicas: 0  Isr: 0
    

    Now, we will verify the setup by using the Kafka console producer and consumer as done in the previous section using the following command:

    ./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic verification
    

    Here, kafka1, kafka2, and kafka3 are the IP addresses of Kafka brokers. Write the following messages on the console:

    First
    Second
    Third
    

    Let's consume these messages by starting a new console consumer on a new console window as follows:

    ./bin/kafka-console-consumer.sh --zookeeper zoo1:2181 --topic verification --from-beginning
    

    We will receive the following output:

    First
    Second
    Third
    

So now, we have a working three-broker Kafka cluster. In the next section, we will see how to write a producer that can produce messages to Kafka.

Running multiple Kafka brokers on a single node

If you don't have multiple machines and you want to test how partitions are distributed among various brokers, then you can run multiple Kafka brokers on a single node. The following are the steps to set up multiple Kafka brokers on a single node:

  1. Copy the server.properties file from the config folder to create the server1.properties and server2.properties files in the config folder.
  2. Populate the following properties in the server.properties file:
    broker.id=0
    port=9092
    log.dirs=/var/kafka-logs
    zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
  3. Populate the following properties in the server1.properties file:
    broker.id=1
    port=9093
    log.dirs=/var/kafka-1-logs
    zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
  4. Populate the following properties in the server2.properties file:
    broker.id=2
    port=9094
    log.dirs=/var/kafka-2-logs
    zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
  5. Run the following commands on the three different terminals to start Kafka brokers:
    ./bin/kafka-server-start.sh config/server.properties
    ./bin/kafka-server-start.sh config/server1.properties
    ./bin/kafka-server-start.sh config/server2.properties
    
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.20.68