The first step while developing a machine-learning pipeline is to get the data in a place from where we can feed it to the training algorithm. In this case study, we will be using Kafka as the source of the training data.
For this, we will be writing a Kafka producer that will stream 80 percent of the data in the data file to the Kafka broker. The remaining 20 percent of the data will be stored in a file, which we will use to test our clustering model created by our topology.
We will be creating a Maven project for publishing data into Kafka. The following are the steps for creating the producer:
com.learningstorm
group ID and the ml-kafka-producer
artifact ID.pom.xml
file:<!-- Apache Kafka Dependency --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> <exclusions> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> </exclusions> </dependency>
pom.xml
file. It will allow us to execute the producer using Maven:<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.ml.kafka.KafkaProducer</mainClass> </configuration> </plugin>
com.learningstorm.ml.kafka.KafkaProducer
class that reads the input dataset and produces 80 percent of the data into Kafka to train the model and the remaining data in a file that will be used for predictions later. The following is the code of the KafkaProducer
class:public class KafkaProducer { public static void main(String[] args) throws IOException { // Build the configuration required for connecting to Kafka Properties props = new Properties(); // List of kafka brokers. props.put("metadata.broker.list", "localhost:9092"); // Serializer used for sending data to kafka. //Since we are sending // strings, we are using StringEncoder. props.put("serializer.class", "kafka.serializer.StringEncoder"); // We want acks from Kafka that messages are properly received. props.put("request.required.acks", "1"); // Create the producer instance ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); // This is the input file. This should be the path to the file downloaded // from UIC Machine Learning Repository at // http://archive.ics.uci.edu/ml/databases/synthetic_control/synthetic_control.data File file = new File("/home/anand/Desktop/synthetic_control.data"); Scanner scanner = new Scanner(file); // This is the output file for prediction data. Change it to something // appropiate for your setup File predictioFile = new File("/home/anand/Desktop/prediction.data"); BufferedWriter writer = new BufferedWriter(new FileWriter(predictioFile)); int i = 0; while(scanner.hasNextLine()){ String instance = scanner.nextLine(); if(i++ % 5 == 0){ // write to file writer.write(instance+" "); } else { // produce to kafka KeyedMessage<String, String> data = new KeyedMessage<String, String>("training", instance); producer.send(data); } } // close the files scanner.close(); writer.close(); // close the producer producer.close(); System.out.println("Produced data"); } }
mvn exec:java
The following output is displayed:
[INFO] ---------------------------------------------------- [INFO] Building ml-kafka-producer 0.0.1-SNAPSHOT [INFO] ---------------------------------------------------- [INFO] [INFO] --- exec-maven-plugin:1.2.1:java (default-cli) @ ml-kafka-producer --- Produced data
./bin/kafka-list-topic.sh --zookeeper localhost:2181
The following output is displayed:
topic: training partition: 0 leader: 0 replicas: 0 isr: 0
18.220.181.146