A non-transactional topology

In a non-transactional topology, a spout emits a batch of tuples and doesn't guarantees about what is in each batch. By processing behavior, we can divide the pipeline into two categories:

  • At-most-one-processing: In this type of topology, failed tuples are not retried. Hence, the spout does not wait for an acknowledgment.
  • At-least-once-processing: The failed tuples are re-entered into the processing pipeline. Hence, this type of topology guarantees that every tuple entered in to the processing pipeline must be processed at least once. The retried logic is handled at the spout end because the spout is the source of tuples in the Trident topology.

Let's understand how we can write a non-transactional spout by implementing the storm.trident.spout.IBatchSpout interface:

public class FakeTweetSpout implements IBatchSpout{

  private static final long serialVersionUID = 10L;
  private int batchSize;
  private HashMap<Long, List<List<Object>>> batchesMap = new HashMap<Long, List<List<Object>>>();
  public FakeTweetSpout(int batchSize) {
    this.batchSize = batchSize;
  }

  private static final Map<Integer, String> TWEET_MAP = new HashMap<Integer, String>();
  static {
    TWEET_MAP.put(0, " Adidas #FIFA World Cup Chant Challenge ");
    TWEET_MAP.put(1, "#FIFA worldcup");
    TWEET_MAP.put(2, "#FIFA worldcup");
    TWEET_MAP.put(3, " The Great Gatsby is such a good #movie ");
    TWEET_MAP.put(4, "#Movie top 10");
  }

  private static final Map<Integer, String> COUNTRY_MAP = new HashMap<Integer, String>();
  static {
    COUNTRY_MAP.put(0, "United State");
    COUNTRY_MAP.put(1, "Japan");
    COUNTRY_MAP.put(2, "India");
    COUNTRY_MAP.put(3, "China");
    COUNTRY_MAP.put(4, "Brazil");
  }

  private List<Object> recordGenerator() {
    final Random rand = new Random();
    int randomNumber = rand.nextInt(5);
    int randomNumber2 = rand.nextInt(5);
    return new Values(TWEET_MAP.get(randomNumber),COUNTRY_MAP.get(randomNumber2));
  }

  @Override
  public void ack(long batchId) {
    this.batchesMap.remove(batchId);

  }

  @Override
  public void close() {
    /*This method is used to destroy or close all the connection opened in open method.*/
  
  }

  @Override
  public void emitBatch(long batchId, TridentCollector collector){
    List<List<Object>> batches = this.batchesMap.get(batchId);
    if(batches == null) {
      batches = new ArrayList<List<Object>>();
      for (int i=0;i < this.batchSize;i++) {
        batches.add(this.recordGenerator());
      }
      this.batchesMap.put(batchId, batches);
    }
    for(List<Object> list : batches){
      collector.emit(list);
    }

  }

  @Override
  public Map getComponentConfiguration() {
    /* This method is use to set the spout configuration like defining the parallelism, etc.*/
    return null;
  }

  @Override
  public Fields getOutputFields() {

    return new Fields("text","Country");
  }

  @Override
  public void open(Map arg0, TopologyContext arg1) {
    /*This method is used to initialize the variable, open the connection with external source, etc. */

  }

}

The FakeTweetSpout class implements the storm.trident.spout.IBatchSpout interface. The construct of the FakeTweetSpout(int batchSize) method takes batchSize as an argument; if batchSize is 3, then every batch emitted by the FakeTweetSpout class contains three tuples. The recordGenerator() method contains logic to generate the fake tweet. The following is a sample fake tweet:

["Adidas #FIFA World Cup Chant Challenge", "Brazil"]
["The Great Gatsby is such a good movie","India"]

The getOutputFields() method returns two fields, text and country. The emitBatch(long batchId, TridentCollector collector) method uses the batchSize variable to decide the number of tuples in each batch and emits a batch to the processing pipeline.

The batchesMap collection contains batchId as the key and the batch of tuples as the value. All batches emitted by emitBatch(long batchId, TridentCollector collector) will be added to the batchesMap collection.

The ack(long batchId) method receives batchId as an acknowledgment and will remove the corresponding batch from the batchesMap collection.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.174.0