In a non-transactional topology, a spout emits a batch of tuples and doesn't guarantees about what is in each batch. By processing behavior, we can divide the pipeline into two categories:
Let's understand how we can write a non-transactional spout by implementing the storm.trident.spout.IBatchSpout
interface:
public class FakeTweetSpout implements IBatchSpout{ private static final long serialVersionUID = 10L; private int batchSize; private HashMap<Long, List<List<Object>>> batchesMap = new HashMap<Long, List<List<Object>>>(); public FakeTweetSpout(int batchSize) { this.batchSize = batchSize; } private static final Map<Integer, String> TWEET_MAP = new HashMap<Integer, String>(); static { TWEET_MAP.put(0, " Adidas #FIFA World Cup Chant Challenge "); TWEET_MAP.put(1, "#FIFA worldcup"); TWEET_MAP.put(2, "#FIFA worldcup"); TWEET_MAP.put(3, " The Great Gatsby is such a good #movie "); TWEET_MAP.put(4, "#Movie top 10"); } private static final Map<Integer, String> COUNTRY_MAP = new HashMap<Integer, String>(); static { COUNTRY_MAP.put(0, "United State"); COUNTRY_MAP.put(1, "Japan"); COUNTRY_MAP.put(2, "India"); COUNTRY_MAP.put(3, "China"); COUNTRY_MAP.put(4, "Brazil"); } private List<Object> recordGenerator() { final Random rand = new Random(); int randomNumber = rand.nextInt(5); int randomNumber2 = rand.nextInt(5); return new Values(TWEET_MAP.get(randomNumber),COUNTRY_MAP.get(randomNumber2)); } @Override public void ack(long batchId) { this.batchesMap.remove(batchId); } @Override public void close() { /*This method is used to destroy or close all the connection opened in open method.*/ } @Override public void emitBatch(long batchId, TridentCollector collector){ List<List<Object>> batches = this.batchesMap.get(batchId); if(batches == null) { batches = new ArrayList<List<Object>>(); for (int i=0;i < this.batchSize;i++) { batches.add(this.recordGenerator()); } this.batchesMap.put(batchId, batches); } for(List<Object> list : batches){ collector.emit(list); } } @Override public Map getComponentConfiguration() { /* This method is use to set the spout configuration like defining the parallelism, etc.*/ return null; } @Override public Fields getOutputFields() { return new Fields("text","Country"); } @Override public void open(Map arg0, TopologyContext arg1) { /*This method is used to initialize the variable, open the connection with external source, etc. */ } }
The FakeTweetSpout
class implements the storm.trident.spout.IBatchSpout
interface. The construct of the FakeTweetSpout(int batchSize)
method takes batchSize
as an argument; if batchSize
is 3, then every batch emitted by the FakeTweetSpout
class contains three tuples. The
recordGenerator()
method contains logic to generate the fake tweet. The following is a sample fake tweet:
["Adidas #FIFA World Cup Chant Challenge", "Brazil"] ["The Great Gatsby is such a good movie","India"]
The getOutputFields()
method returns two fields, text
and country
. The emitBatch(long batchId, TridentCollector collector)
method uses the batchSize
variable to decide the number of tuples in each batch and emits a batch to the processing pipeline.
The batchesMap
collection contains batchId
as the key and the batch of tuples as the value. All batches emitted by emitBatch(long batchId, TridentCollector collector)
will be added to the batchesMap
collection.
The ack(long batchId)
method receives batchId
as an acknowledgment and will remove the corresponding batch from the batchesMap
collection.
3.17.174.0