Producing a training dataset into Kafka

The first step while developing a machine-learning pipeline is to get the data in a place from where we can feed it to the training algorithm. In this case study, we will be using Kafka as the source of the training data.

For this, we will be writing a Kafka producer that will stream 80 percent of the data in the data file to the Kafka broker. The remaining 20 percent of the data will be stored in a file, which we will use to test our clustering model created by our topology.

We will be creating a Maven project for publishing data into Kafka. The following are the steps for creating the producer:

  1. Create a new Maven project with the com.learningstorm group ID and the ml-kafka-producer artifact ID.
  2. Add the following dependencies for Kafka in the pom.xml file:
    <!-- Apache Kafka Dependency -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
      <version>0.8.0</version>
      <exclusions>
        <exclusion>
          <groupId>com.sun.jdmk</groupId>
          <artifactId>jmxtools</artifactId>
        </exclusion>
        <exclusion>
          <groupId>com.sun.jmx</groupId>
          <artifactId>jmxri</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
  3. Add the following build plugins to the pom.xml file. It will allow us to execute the producer using Maven:
    <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>exec-maven-plugin</artifactId>
      <version>1.2.1</version>
      <executions>
        <execution>
          <goals>
            <goal>exec</goal>
          </goals>
        </execution>
      </executions>
      <configuration>
        <executable>java</executable>
        <includeProjectDependencies>true</includeProjectDependencies>
        <includePluginDependencies>false</includePluginDependencies>
        <classpathScope>compile</classpathScope>
        <mainClass>com.learningstorm.ml.kafka.KafkaProducer</mainClass>
      </configuration>
    </plugin>
  4. Now, we will create the com.learningstorm.ml.kafka.KafkaProducer class that reads the input dataset and produces 80 percent of the data into Kafka to train the model and the remaining data in a file that will be used for predictions later. The following is the code of the KafkaProducer class:
    public class KafkaProducer {
    
      public static void main(String[] args) throws IOException {
    
        // Build the configuration required for connecting to Kafka
        Properties props = new Properties();
    
        // List of kafka brokers.
        props.put("metadata.broker.list", "localhost:9092");
    
        // Serializer used for sending data to kafka. //Since we are sending
        // strings, we are using StringEncoder.
        props.put("serializer.class", "kafka.serializer.StringEncoder");
    
        // We want acks from Kafka that messages are properly received.
        props.put("request.required.acks", "1");
    
        // Create the producer instance
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
    
        // This is the input file. This should be the path to the file downloaded
        // from UIC Machine Learning Repository at
        // http://archive.ics.uci.edu/ml/databases/synthetic_control/synthetic_control.data
        File file = new File("/home/anand/Desktop/synthetic_control.data");
        Scanner scanner = new Scanner(file);
    
        // This is the output file for prediction data. Change it to something
        // appropiate for your setup
        File predictioFile = new File("/home/anand/Desktop/prediction.data");
        BufferedWriter writer = new BufferedWriter(new FileWriter(predictioFile));
    
        int i = 0;
        while(scanner.hasNextLine()){
          String instance = scanner.nextLine();
          if(i++ % 5 == 0){
            // write to file
            writer.write(instance+"
    ");
          } else {
            // produce to kafka
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("training", instance);
            producer.send(data);
          }
        }
    
        // close the files
        scanner.close();
        writer.close();
    
        // close the producer
        producer.close();
    
        System.out.println("Produced data");
      }
    }
  5. Now that the producer is ready, make sure Kafka is running on your system.
  6. Now, run the producer with the following command:
    mvn exec:java
    

    The following output is displayed:

    [INFO] ----------------------------------------------------
    [INFO] Building ml-kafka-producer 0.0.1-SNAPSHOT
    [INFO] ----------------------------------------------------
    [INFO]
    [INFO] --- exec-maven-plugin:1.2.1:java (default-cli) @ ml-kafka-producer ---
    Produced data
    
  7. Now, let's verify that the data has been produced into Kafka by executing the following command and verifying that the topic has been created:
    ./bin/kafka-list-topic.sh  --zookeeper localhost:2181
    

    The following output is displayed:

    topic: training  partition: 0  leader: 0  replicas: 0  isr: 0
    
  8. The file that will be used for prediction should also be generated at the path given in the class. Please verify that it exists.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.220.181.146