Producer - putting messages into Kafka

The default producer configuration that comes bundled with Kafka is as given below, located at ${KAFKA_HOME}/config/producer.properties:

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
#partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
Code 01: Console Producer Configuration

This is the basic configuration required for Kafka producer, wherein, the most important and the only required configuration parameters are:

  1. bootstrap.servers: Refers to the Kafka broker listening ports, which will be comma-separated to specify a cluster of brokers in a multi-broker Kafka cluster.
  2. key.serializer: Specifies the serializer for  serializing the key of the message (value). The value of which is a fully qualified class name of the desired serializer. The default for the console producer is org.apache.kafka.common.serialization.StringSerializer.
  3. value.serializer: A property that specifies the serializer for serializing the message (value). The value of which is a fully qualified class name of the desired serializer. The default for console producer is: org.apache.kafka.common.serialization.StringSerializer.
Serialization is the process of converting an object into a stream of bytes in order to store the object or transmit it to memory, a database, or a file. https://goo.gl/eDQE5A (https://docs.microsoft.com)

A simple producer can be built easily by using Kafka libraries as follows:

  1. Checkout the latest code from the source repository using the following command:
git clone https://github.com/PacktPublishing/Data-Lakes-for-Enterprises
  1. If the repository is already cloned, ensure to check out the latest source code with the following command:

    git pull
    
  1. Within the repository, the source code of this chapter is under the folder named, chapter07.
  2. The following is the maven dependency for a simple producer, declared in chapter07/pom.xml.
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka
/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
</dependencies>
Code 02: Maven Dependencies for Simple Producer
  1. A simple producer Java implementation is as shown in the following code snippet:
public class SimpleProducer {

public static void main(String[] args) throws
ExecutionException, InterruptedException {
Properties props = new Properties();

/*
Set the list of broker addresses separated by commas.
This needs to be updated with IP of your VM running Kafka
broker
*/
props.setProperty("bootstrap.servers",
"192.168.0.117:9092");

//Set the serializer for key of the message(value)
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

//Set the serializer for the message (value)
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

//Create a producer
Producer<String, String> producer = new KafkaProducer<String, String>(props);

//Create a message to be sent to a topic
ProducerRecord message = new
ProducerRecord("customer", "001", "A Sample Message...");

//send the message
producer.send(message);

System.out.println("Message Published");

//close the producer connection
producer.close();
}
}
Code 03: A Simple Producer

Few observations in the preceding examples are:

  • The minimum required configuration for a Kafka producer has been put in as a properties object. This could have been provided as a map as well.
  • We are using the default serializers for message key and it's value.
  • While publishing the message we are also specifying the topic as well as key of the message.
  • The topic specified while sending the message will get created if it does not exist already.
  1. For the simple producer to publish messages to a remote broker process, it is important to change the following setting in the server.properties, that is, ${KAFKA_HOME}/config/server.properties, of Kafka server such that the Kafka server binds to the correct external IP, rather than binding to the localhost.
Figure 14: Binding Host (external IP) configuration of Kafka Server
  1. The preceding source code can be run by simply executing the main program in an IDE of your choice. The only precondition is that the Kafka must be running. Make sure that the broker IP in the SimpleProducer class (line 18) is changed before running the class. The console output will show a message stating Message Published once the message is published to the topic.
  1. The message published can be verified from the Kafka console by running the following command:
${KAFKA_HOME}/bin/kafka-console-consumer.sh --topic customer --bootstrap-server <broker-ip>:9092 --from-beginning

Figure 15: Output of console consumer for Messages published by Simple Producer
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.126.199