Now that we have the data to be used to train and predict in place, we will develop the Trident topology using the Trident-ML library.
Again, we will create a Maven project to implement our topology. The following are the steps to create this project:
com.learningstorm
group ID and the ml
artifact ID.pom.xml
file:<!-- Dependency for Storm --> <dependency> <groupId>storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.0.1</version> <scope>provided</scope> </dependency> <!-- Dependency for Storm-Kafka spout --> <dependency> <groupId>net.wurstmeister.storm</groupId> <artifactId>storm-kafka-0.8-plus</artifactId> <version>0.4.0</version> </dependency> <!-- Dependency for Trident-ML --> <dependency> <groupId>com.github.pmerienne</groupId> <artifactId>trident-ml</artifactId> <version>0.0.4</version> </dependency>
pom.xml
file:<repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository>
pom.xml
file. It will allow us to execute the Trident topology in the local mode using Maven:<plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>com.learningstorm.ml.TridentMLTopology</mainClass> </configuration> </plugin>
com.github.pmerienne.trident.ml.core.Instance
class. Let's create the com.learningstorm.ml.FeaturesToValues
class that will convert the first string from the tuple into an Instance
object. It will split the string on space character and convert each number into a double
value to create an Instance
object. The following is the code for the FeaturesToValues
class:public class FeaturesToValues extends BaseFunction { @SuppressWarnings("rawtypes") public void execute(TridentTuple tuple, TridentCollector collector) { // get the input string String line = tuple.getString(0); double[] features = new double[60]; // split the input string and iterate over them and covert to double String[] featureList = line.split("\s+"); for(int i = 0; i < features.length; i++){ features[i] = Double.parseDouble(featureList[i]); } // emit the Instance object with the features from given input string collector.emit(new Values(new Instance(features))); } }
com.learningstorm.ml.TridentMLTopology
class with the following code:public class TridentMLTopology { public static void main(String[] args) throws InterruptedException, IOException { // Kafka Spout definition // Specify the zk hosts for Kafka, change as needed BrokerHosts brokerHosts = new ZkHosts("localhost:2181"); // Specify the topic name for training and the client id // here topic name is 'training' and client id is 'storm' TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "training", "storm"); // We will always consume from start so that we can run the topology multiple times while debugging. In production, this should be false. kafkaConfig.forceFromStart = true; // We have string data in the kafka, so specify string scheme here kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // Define the spout for reading from kafka TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig); // Topology definition // now we will define the topology that will build the clustering model TridentTopology topology = new TridentTopology(); // Training stream: // 1. Read a from string from kafka // 2. Convert trident tuple to instance // 3. Update the state of clusterer TridentState kmeansState = topology.newStream("samples", kafkaSpout).each(new Fields("str"), new FeaturesToValues(), new Fields("instance")).partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClusterUpdater("kmeans", new KMeans(6))); // Now we will build LocalDRPC that will be used to predict the cluster of a tuple LocalDRPC localDRPC = new LocalDRPC(); // Clustering stream // 1. Define a new clustering stream with name = 'predict' // 2. Convert DRPC args to instance // 3. Query cluster to classify the instance // We are using KMeans(6) as we want to cluster into 6 categories topology.newDRPCStream("predict", localDRPC).each(new Fields("args"), new FeaturesToValues(), new Fields("instance")).stateQuery(kmeansState, new Fields("instance"), new ClusterQuery("kmeans"), new Fields("prediction")); // Create a new local cluster for testing LocalCluster cluster = new LocalCluster(); // submit the topology for execution cluster.submitTopology("kmeans", new Config(), topology.build()); // give the topology enough time to create the clustering model Thread.sleep(10000); // Create the prediction consumer, please change the path for input and output // file as needed PredictionConsumer predictionConsumer = new PredictionConsumer(localDRPC, "/home/anand/Desktop/prediction.data", "/home/anand/Desktop/predicted.data"); // Predict and write the output predictionConsumer.predict(); // shutdown cluster and drpc cluster.shutdown(); localDRPC.shutdown(); } }
com.learningstorm.ml.PredictionConsumer
class with the following code:public class PredictionConsumer { // drpc instance used for prediction private final LocalDRPC drpc; // input file, generated by kafka producer for prediction private final String input; // output file, where the predicted data will be stored private final String output; public PredictionConsumer(LocalDRPC drpc, String input, String output) { this.drpc = drpc; this.input = input; this.output = output; } /** * This method predicts the categories for the records in the input file and writes them to the output file. */ public void predict() throws IOException{ // Scanner on the input file Scanner scanner = new Scanner(new File(input)); // Writer for the output BufferedWriter writer = new BufferedWriter(new FileWriter(new File(output))); while(scanner.hasNextLine()){ String line = scanner .nextLine(); if(line.trim().length()==1){ // empty line, skip continue; } // predict the category for this line String prediction = drpc.execute("predict", line); // write the predicted category for this line writer.write(prediction+" "); } // close the scanner and writer scanner.close(); writer.close(); } }
mvn exec:java
If we are not running in the local mode DRPC, we will need to launch the DRPC server before running the topology. The following are the steps to run the DRPC server in the clustered mode:
1. Start the DRPC server with the following command:
bin/storm drpc
2. Add DRPC servers in the storm.yaml
file with the following entry:
drpc.servers: - "server1" - "server2"
The first highlighted string is the input tuple for which the prediction is to be made. After that, we can see that this input instance was converted into an Instance
object with label = null
and features extracted from the input string in the form of a double
array. The final highlighted number—1
, in this case—represents the predicted category for this input.
Here, we have run the topology and classification in the local mode using LocalCluster
and LocalDRPC
, but this can run equally well on a Storm cluster. The only change that we will need to make is to write predictions to some central storage, such as NFS, instead of the local filesystem.
3.149.232.152