A sample Trident topology

This section explains how you can write a Trident topology. We will perform the following steps to create a sample Trident topology:

  1. Create a Maven project using com.learningstorm as the group ID and trident-example as the artifact ID.
  2. Add the following dependencies and repositories in the pom.xml file:
    <dependencies>
      <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>3.8.1</version>
        <scope>test</scope>
      </dependency>
      <dependency>
        <groupId>storm</groupId>
        <artifactId>storm</artifactId>
        <version>0.9.0.1</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
    
    <repositories>
      <repository>
        <id>clojars.org</id>
        <url>http://clojars.org/repo</url>
      </repository>
    </repositories>
  3. Create a TridentUtility class in the com.learningstorm.trident_example package. This class contains a Trident filter and function:
    public class TridentUtility {
      /* Get the comma separated value as input, split the field by comma, and then emits multiple tuple as output.*/
      public static class Split extends BaseFunction {
    
        private static final long serialVersionUID = 2L;
    
        public void execute(TridentTuple tuple, TridentCollector collector) {
          String countries = tuple.getString(0);
          for (String word : countries.split(",")) {
            collector.emit(new Values(word));
          }
        }
      }
    
      /* This class extends BaseFilter and contain isKeep method which emits only those tuple which has #FIFA in text field.*/
      public static class TweetFilter extends BaseFilter {
    
        private static final long serialVersionUID = 1L;
    
        public boolean isKeep(TridentTuple tuple) {
          if (tuple.getString(0).contains("#FIFA")) {
            return true;
          } else {
            return false;
          }
        }
    
      }
    
      /* This class extends BaseFilter and contain isKeep method which will print the input tuple.*/
      public static class Print extends BaseFilter {
    
        private static final long serialVersionUID = 1L;
    
        public boolean isKeep(TridentTuple tuple) {
          System.out.println(tuple);
          return true;
        }
    
      }
    }

    The TridentUtility class contains the following three inner classes:

    • The Split class extends the storm.trident.operation.BaseFunction class and contains the execute(TridentTuple tuple, TridentCollector collector) method. The execute() method takes a comma-separated value as the input, splits the input value, and emits multiple tuples as the output.
    • The TweetFilter class extends the storm.trident.operation.BaseFilter class and contains the isKeep(TridentTuple tuple) method. The isKeep() method takes the tuple as the input and checks whether the input tuple contains the #FIFA value in the text field or not. If the tuple contains #FIFA in the text field, then the method returns true; otherwise, it returns false.
    • The Print class extends the storm.trident.operation.BaseFilter class and contains the isKeep(TridentTuple tuple) method. The isKeep() method prints the input tuple and returns true.
  4. Create a TridentHelloWorldTopology class in the com.learningstorm.trident_example package. This class defines the sample Trident topology; its code is as follows:
    public class TridentHelloWorldTopology {
    
      public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(20);
        if (args.length == 0) {
          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("Count", conf, buildTopology());
        } else {
          conf.setNumWorkers(3);
          StormSubmitter.submitTopology(args[0], conf, buildTopology());
        }
      }
    
      public static StormTopology buildTopology() {
    
        FakeTweetSpout spout = new FakeTweetSpout(10);
        TridentTopology topology = new TridentTopology();
    
        topology.newStream("faketweetspout", spout).shuffle().each(new Fields("text", "Country"), new TridentUtility.TweetFilter()).groupBy(new Fields("Country")).aggregate(new Fields("Country"), new Count(), new Fields("count")).each(new Fields("count"), new TridentUtility.Print()).parallelismHint(2);
    
        return topology.build();
      }
    }

    Let's understand the code line by line. Firstly, we are creating an object of the TridentTopology class to define the Trident computation.

    The TridentTopology class contains a method called newStream() that will take the input source as an argument. In this example, we are using the FakeTweetSpout class created in the A non-transactional topology section as an input source. Like Storm, Trident also maintains the state of each input source in ZooKeeper. Here, the faketweetspout string specifies the node name in ZooKeeper where Trident maintains the metadata.

    The spout emits a stream which has two fields, text and country.

    We are repartitioning the batch of tuples emitted by the input source using the shuffle() operation. The next line of the topology definition applies the TweetFilter class on each tuple. The TweetFilter class filters out all those tuples that do not contain the #FIFA keyword.

    The output of the TweetFilter class is grouped by the country field. Then, we will apply the count aggregator to count the number of tweets for each country. Finally, we will apply a Print class to print the output of the aggregate method.

    The following is the console output of the TridentHelloWorldTopology class topology:

    A sample Trident topology

    The output of the sample Trident topology

The following diagram shows the execution of the sample Trident topology:

A sample Trident topology

The high-level view of the sample Trident topology

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.37.196