Building a Trident topology to build the clustering model

Now that we have the data to be used to train and predict in place, we will develop the Trident topology using the Trident-ML library.

Again, we will create a Maven project to implement our topology. The following are the steps to create this project:

  1. Create a new Maven project with the com.learningstorm group ID and the ml artifact ID.
  2. Add the following dependencies for Kafka in the pom.xml file:
    <!-- Dependency for Storm -->
    <dependency>
      <groupId>storm</groupId>
      <artifactId>storm-core</artifactId>
      <version>0.9.0.1</version>
      <scope>provided</scope>
    </dependency>
    
    <!-- Dependency for Storm-Kafka spout -->
    <dependency>
      <groupId>net.wurstmeister.storm</groupId>
      <artifactId>storm-kafka-0.8-plus</artifactId>
      <version>0.4.0</version>
    </dependency>
    
    <!-- Dependency for Trident-ML -->
    <dependency>
      <groupId>com.github.pmerienne</groupId>
      <artifactId>trident-ml</artifactId>
      <version>0.0.4</version>
    </dependency>
  3. Add the following repository in the pom.xml file:
    <repository>
      <id>clojars.org</id>
      <url>http://clojars.org/repo</url>
    </repository>
  4. Add the following build plugins to the pom.xml file. It will allow us to execute the Trident topology in the local mode using Maven:
    <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>exec-maven-plugin</artifactId>
      <version>1.2.1</version>
      <executions>
        <execution>
          <goals>
            <goal>exec</goal>
          </goals>
        </execution>
      </executions>
      <configuration>
        <executable>java</executable>
        <includeProjectDependencies>true</includeProjectDependencies>
        <includePluginDependencies>false</includePluginDependencies>
        <classpathScope>compile</classpathScope>
        <mainClass>com.learningstorm.ml.TridentMLTopology</mainClass>
      </configuration>
    </plugin>
  5. The Trident-ML library takes the input—for both model building and later prediction—as objects of the com.github.pmerienne.trident.ml.core.Instance class. Let's create the com.learningstorm.ml.FeaturesToValues class that will convert the first string from the tuple into an Instance object. It will split the string on space character and convert each number into a double value to create an Instance object. The following is the code for the FeaturesToValues class:
    public class FeaturesToValues extends BaseFunction {
    
      @SuppressWarnings("rawtypes")
      public void execute(TridentTuple tuple, TridentCollector collector) {
        // get the input string
        String line = tuple.getString(0);
    
        double[] features = new double[60];
    
        // split the input string and iterate over them and covert to double
        String[] featureList = line.split("\s+");
        for(int i = 0; i < features.length; i++){
          features[i] = Double.parseDouble(featureList[i]);
        }
    
        // emit the Instance object with the features from given input string
        collector.emit(new Values(new Instance(features)));
      }
    }
  6. Now, we will create the Trident topology that will create the K-means clustering model and will also expose this model as a DRPC call so that the model can be used to predict the class for the test data. Create the com.learningstorm.ml.TridentMLTopology class with the following code:
    public class TridentMLTopology {
    
      public static void main(String[] args) throws InterruptedException, IOException {
        // Kafka Spout definition
        // Specify the zk hosts for Kafka, change as needed
        BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
    
        // Specify the topic name for training and the client id
        // here topic name is 'training' and client id is 'storm'
        TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "training", "storm");
    
        // We will always consume from start so that we can run the topology multiple times while debugging. In production, this should be false.
        kafkaConfig.forceFromStart = true;
    
        // We have string data in the kafka, so specify string scheme here
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
        // Define the spout for reading from kafka
        TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
    
        // Topology definition
        // now we will define the topology that will build the clustering model
        TridentTopology topology = new TridentTopology();
    
        // Training stream:
        // 1. Read a from string from kafka
        // 2. Convert trident tuple to instance
        // 3. Update the state of clusterer
        TridentState kmeansState = topology.newStream("samples", kafkaSpout).each(new Fields("str"), new FeaturesToValues(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClusterUpdater("kmeans", new KMeans(6)));
    
        // Now we will build LocalDRPC that will be used to predict the cluster of a tuple
        LocalDRPC localDRPC = new LocalDRPC();
    
        // Clustering stream
        // 1. Define a new clustering stream with name = 'predict'
        // 2. Convert DRPC args to instance
        // 3. Query cluster to classify the instance
    
        // We are using KMeans(6) as we want to cluster into 6 categories
        topology.newDRPCStream("predict", localDRPC).each(new Fields("args"), new FeaturesToValues(), new Fields("instance")).stateQuery(kmeansState, new Fields("instance"), new ClusterQuery("kmeans"), new Fields("prediction"));
    
        // Create a new local cluster for testing
        LocalCluster cluster = new LocalCluster();
    
        // submit the topology for execution
        cluster.submitTopology("kmeans", new Config(), topology.build());
    
        // give the topology enough time to create the clustering model
        Thread.sleep(10000);
    
        // Create the prediction consumer, please change the path for input and output
        // file as needed
        PredictionConsumer predictionConsumer = new PredictionConsumer(localDRPC, "/home/anand/Desktop/prediction.data", "/home/anand/Desktop/predicted.data");
    
        // Predict and write the output
        predictionConsumer.predict();
    
        // shutdown cluster and drpc
        cluster.shutdown();
        localDRPC.shutdown();
      }
    }
  7. Now that the topology is ready, let's create a consumer that will predict the category for the test data generated in the last section. For this, create the com.learningstorm.ml.PredictionConsumer class with the following code:
    public class PredictionConsumer {
      // drpc instance used for prediction
      private final LocalDRPC drpc;
    
      // input file, generated by kafka producer for prediction
      private final String input;
    
      // output file, where the predicted data will be stored
      private final String output;
    
      public PredictionConsumer(LocalDRPC drpc, String input, String output) {
        this.drpc = drpc;
        this.input = input;
        this.output = output;
      }
    
      /**
      * This method predicts the categories for the records in the input file and writes them to the output file.
      */
      public void predict() throws IOException{
        // Scanner on the input file
        Scanner scanner = new Scanner(new File(input));
    
        // Writer for the output
        BufferedWriter writer = new BufferedWriter(new FileWriter(new File(output)));
    
        while(scanner.hasNextLine()){
          String line = scanner .nextLine();
          if(line.trim().length()==1){
            // empty line, skip
            continue;
          }
    
          // predict the category for this line
          String prediction = drpc.execute("predict", line);
    
          // write the predicted category for this line
          writer.write(prediction+"
    ");
        }
    
        // close the scanner and writer
        scanner.close();
        writer.close();
      }
    }
  8. Now we have all the components in place and we can run the topology. Now, when running, it will first create the clustering model and then classify the test data generated earlier using that mode. To run it using Maven, execute the following command:
    mvn exec:java
    

    Note

    If we are not running in the local mode DRPC, we will need to launch the DRPC server before running the topology. The following are the steps to run the DRPC server in the clustered mode:

    1. Start the DRPC server with the following command:

    bin/storm drpc
    

    2. Add DRPC servers in the storm.yaml file with the following entry:

    drpc.servers:
      - "server1"
      - "server2"
    
  9. After running the preceding command, you should be able to see the output with the classified example. Let's look at the first line in that file, which is shown in the following screenshot:
    Building a Trident topology to build the clustering model

    The predicted data

The first highlighted string is the input tuple for which the prediction is to be made. After that, we can see that this input instance was converted into an Instance object with label = null and features extracted from the input string in the form of a double array. The final highlighted number—1, in this case—represents the predicted category for this input.

Here, we have run the topology and classification in the local mode using LocalCluster and LocalDRPC, but this can run equally well on a Storm cluster. The only change that we will need to make is to write predictions to some central storage, such as NFS, instead of the local filesystem.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.232.152