Trident aggregators

The Trident's aggregator is used to perform aggregation operations on an input batch or partition or stream. For example, let's say a user wants to count the number of tuples present in each batch, then he/she can use the count aggregator to count the number of tuples in each batch. The output of the Aggregator interface completely replaces the value of the input tuple. There are three types of aggregators available in Trident:

  • The partition aggregate
  • The aggregate
  • The persistence aggregate

Let's understand each type of aggregator in detail.

The partition aggregate

As the name suggests, the partition aggregate works on each partition instead of the entire batch. The output of the partition aggregate completely replaces the input tuple. Also, the output of the partition aggregate contains a single field tuple. The following is the piece of code that shows how we can use the partitionAggregate method:

mystream.partitionAggregate(new Fields("x"), new Count(), new Fields("count"))

For example, we have an input stream that contains the x and y fields, and we will apply a partitionAggregate function on each partition; the output tuples contain a single field called count. The count field represents the number of tuples present in the input partition. The following is a diagram that shows the working of the partitionAggregate function:

The partition aggregate

Working of the partition aggregate

The aggregate

An aggregate works on each batch. During the aggregate process, the tuples are first repartitioned using the global operation to combine all partitions of the same batch into a single partition. Then, we run the aggregation function on each batch. The following is the piece of code that shows how we can use the aggregate function:

mystream.aggregate(new Fields("x"), new Count() ,new Fields("count"))

Three types of the Aggregator interface are available in Trident:

  • ReducerAggregator
  • Aggregator
  • CombinerAggregator

The preceding three Aggregator interfaces can also be used with the partition aggregate.

The ReducerAggregator interface

The ReducerAggregator interface first runs the global repartitioning operation on the input stream to combine all the partitions of the same batch into a single partition, and then runs the aggregation function on each batch. The ReducerAggregator<T> interface contains the following methods:

  • init(): This method returns the initial value
  • reduce(T curr, TridentTuple tuple): This method iterates over the input tuples and emits a single tuple with a single value

The following example code shows how we can implement a Sum class using the ReducerAggregator interface:

public static class Sum implements ReducerAggregator<Long> {

  private static final long serialVersionUID = 1L;
  //return the initial value zero
  public Long init() {
    return 0L;
  }
  //Iterates on the input tuples, calculate the sum and
  //produce the single tuple with single field as output
  public Long reduce(Long curr, TridentTuple tuple) {
    return curr+tuple.getLong(0);
  }

}

The Aggregator interface

The Aggregator interface first runs the global repartitioning operation on the input stream to combine all the partitions of the same batch into a single partition, and then runs the aggregation function on each batch. By definition, the Aggregator interface looks very similar to the ReduceAggregator interface. The BaseAggregator<State> interface contains the following methods:

  • init(Object batchId, TridentCollector collector): The init() method is called before starting the processing of the batch. This method returns the State object, which we will use to save the state of the batch. This object is used by the aggregate() and complete() methods.
  • aggregate(State s, TridentTuple tuple, TridentCollector collector): This method iterates over each tuple of the given batch. It also updates the state in the State object after processing each tuple.
  • complete(State state, TridentCollector tridentCollector): This method is called at the end if all tuples of the given batch are processed. This method returns a single tuple corresponding to each batch.

The following is an example that shows how we can implement the SumAsAggregator class using the BaseAggregator interface:

public static class SumAsAggregator extends BaseAggregator<SumAsAggregator.State> {

  private static final long serialVersionUID = 1L;
  // state class
  static class State {
    long count = 0;
  }
  // Initialize the state
  public State init(Object batchId, TridentCollector collector) {
    return new State();
  }
  // Maintain the state of sum into count variable.
  public void aggregate(State state, TridentTuple tridentTuple, TridentCollector tridentCollector) {
    state.count = tridentTuple.getLong(0) + state.count;
  }
  // return a tuple with single value as output 
  // after processing all the tuples of given batch.
  public void complete(State state, TridentCollector tridentCollector) {
    tridentCollector.emit(new Values(state.count));
  }

}

The CombinerAggregator interface

The CombinerAggregator interface first runs the partition aggregate on each partition, then runs the global repartitioning operation to combine all the partitions of the same batch into a single partition, and then reruns the aggregator on the final partition to emit the desired output. The network transfer in the case of the CombinerAggregator interface is less compared to the other two aggregators. Hence, the overall performance of the CombinerAggregator interface is better compared to the Aggregator and ReduceAggregator interfaces. The CombinerAggregator<T> interface contains the following methods:

  • init(): This method runs on each input tuple to retrieve the field values from the tuples.
  • combine(T val1, T val2): This method combines the values of tuples. It emits a single tuple with a single field as output.
  • zero(): This method returns a zero value if the input partition contains no tuple.

The following example code shows how we can implement the Sum class using the CombinerAggregator interface:

public class Sum implements CombinerAggregator<Number> {

  private static final long serialVersionUID = 1L;

  public Number init(TridentTuple tridentTuple) {
    return (Number) tridentTuple.getValue(0);
  }

  public Number combine(Number number1, Number number2) {
    return Numbers.add(number1, number2);
  }

  public Number zero() {
    return 0;
  }

}

The persistent aggregate

The persistent aggregate works on all tuples across all the batches in a stream and persists the aggregate result to the source of the state (Memory, Memcached, Cassandra, or some other database). The following piece of code shows how we can use the persistentAggregate function:

mystream.persistentAggregate(new MemoryMapState.Factory(), new Fields("select"),new Count(),new Fields("count"));

We will discuss more on this in the Maintaining the topology state with Trident section.

Aggregator chaining

Trident provides us with a feature to apply multiple aggregators on the same input stream, and this process is called aggregator chaining. The following piece of code shows how we can use aggregator chaining:

mystream.chainedAgg().partitionAggregate(new Fields("b"), new Average(), new Fields("average")).partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")).chainEnd();

We have applied the Average() and Sum() aggregators on each partition. The output of the chainedAgg() function contains a single tuple corresponding to each input partition. The output tuple contains two fields, sum and average.

The following diagram shows how aggregator chaining works:

Aggregator chaining

Working of aggregator chaining

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.226.79