This section explains how you can write a Trident topology. We will perform the following steps to create a sample Trident topology:
com.learningstorm
as the group ID and trident-example
as the artifact ID.pom.xml
file:<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> <scope>provided</scope> </dependency> </dependencies> <repositories> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories>
TridentUtility
class in the com.learningstorm.trident_example
package. This class contains a Trident filter and function:public class TridentUtility { /* Get the comma separated value as input, split the field by comma, and then emits multiple tuple as output.*/ public static class Split extends BaseFunction { private static final long serialVersionUID = 2L; public void execute(TridentTuple tuple, TridentCollector collector) { String countries = tuple.getString(0); for (String word : countries.split(",")) { collector.emit(new Values(word)); } } } /* This class extends BaseFilter and contain isKeep method which emits only those tuple which has #FIFA in text field.*/ public static class TweetFilter extends BaseFilter { private static final long serialVersionUID = 1L; public boolean isKeep(TridentTuple tuple) { if (tuple.getString(0).contains("#FIFA")) { return true; } else { return false; } } } /* This class extends BaseFilter and contain isKeep method which will print the input tuple.*/ public static class Print extends BaseFilter { private static final long serialVersionUID = 1L; public boolean isKeep(TridentTuple tuple) { System.out.println(tuple); return true; } } }
The TridentUtility
class contains the following three inner classes:
Split
class extends the storm.trident.operation.BaseFunction
class and contains the execute(TridentTuple tuple, TridentCollector collector)
method. The execute()
method takes a comma-separated value as the input, splits the input value, and emits multiple tuples as the output.TweetFilter
class extends the storm.trident.operation.BaseFilter
class and contains the isKeep(TridentTuple tuple)
method. The isKeep()
method takes the tuple as the input and checks whether the input tuple contains the #FIFA
value in the text field or not. If the tuple contains #FIFA
in the text field, then the method returns true
; otherwise, it returns false
.Print
class extends the storm.trident.operation.BaseFilter
class and contains the isKeep(TridentTuple tuple)
method. The isKeep()
method prints the input tuple and returns true
.TridentHelloWorldTopology
class in the com.learningstorm.trident_example
package. This class defines the sample Trident topology; its code is as follows:public class TridentHelloWorldTopology { public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(20); if (args.length == 0) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Count", conf, buildTopology()); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, buildTopology()); } } public static StormTopology buildTopology() { FakeTweetSpout spout = new FakeTweetSpout(10); TridentTopology topology = new TridentTopology(); topology.newStream("faketweetspout", spout).shuffle().each(new Fields("text", "Country"), new TridentUtility.TweetFilter()).groupBy(new Fields("Country")).aggregate(new Fields("Country"), new Count(), new Fields("count")).each(new Fields("count"), new TridentUtility.Print()).parallelismHint(2); return topology.build(); } }
Let's understand the code line by line. Firstly, we are creating an object of the TridentTopology
class to define the Trident computation.
The TridentTopology
class contains a method called newStream()
that will take the input source as an argument. In this example, we are using the FakeTweetSpout
class created in the A non-transactional topology section as an input source. Like Storm, Trident also maintains the state of each input source in ZooKeeper. Here, the faketweetspout
string specifies the node name in ZooKeeper where Trident maintains the metadata.
The spout emits a stream which has two fields, text
and country
.
We are repartitioning the batch of tuples emitted by the input source using the shuffle()
operation. The next line of the topology definition applies the TweetFilter
class on each tuple. The TweetFilter
class filters out all those tuples that do not contain the #FIFA
keyword.
The output of the TweetFilter
class is grouped by the country
field. Then, we will apply the count aggregator to count the number of tweets for each country. Finally, we will apply a Print
class to print the output of the aggregate
method.
The following is the console output of the TridentHelloWorldTopology
class topology:
The following diagram shows the execution of the sample Trident topology:
52.14.205.205