The Trident's aggregator is used to perform aggregation operations on an input batch or partition or stream. For example, let's say a user wants to count the number of tuples present in each batch, then he/she can use the count aggregator to count the number of tuples in each batch. The output of the Aggregator
interface completely replaces the value of the input tuple. There are three types of aggregators available in Trident:
Let's understand each type of aggregator in detail.
As the name suggests, the partition aggregate works on each partition instead of the entire batch. The output of the partition aggregate completely replaces the input tuple. Also, the output of the partition aggregate contains a single field tuple. The following is the piece of code that shows how we can use the partitionAggregate
method:
mystream.partitionAggregate(new Fields("x"), new Count(), new Fields("count"))
For example, we have an input stream that contains the x
and y
fields, and we will apply a partitionAggregate
function on each partition; the output tuples contain a single field called count
. The count
field represents the number of tuples present in the input partition. The following is a diagram that shows the working of the partitionAggregate
function:
An aggregate works on each batch. During the aggregate process, the tuples are first repartitioned using the global
operation to combine all partitions of the same batch into a single partition. Then, we run the aggregation function on each batch. The following is the piece of code that shows how we can use the aggregate
function:
mystream.aggregate(new Fields("x"), new Count() ,new Fields("count"))
Three types of the Aggregator
interface are available in Trident:
ReducerAggregator
Aggregator
CombinerAggregator
The preceding three Aggregator
interfaces can also be used with the partition aggregate.
The ReducerAggregator
interface first runs the global
repartitioning operation on the input stream to combine all the partitions of the same batch into a single partition, and then runs the aggregation function on each batch. The ReducerAggregator<T>
interface contains the following methods:
The following example code shows how we can implement a Sum
class using the ReducerAggregator
interface:
public static class Sum implements ReducerAggregator<Long> { private static final long serialVersionUID = 1L; //return the initial value zero public Long init() { return 0L; } //Iterates on the input tuples, calculate the sum and //produce the single tuple with single field as output public Long reduce(Long curr, TridentTuple tuple) { return curr+tuple.getLong(0); } }
The Aggregator
interface first runs the global
repartitioning operation on the input stream to combine all the partitions of the same batch into a single partition, and then runs the aggregation function on each batch. By definition, the Aggregator
interface looks very similar to the ReduceAggregator
interface. The BaseAggregator<State>
interface contains the following methods:
init(Object batchId, TridentCollector collector)
: The init()
method is called before starting the processing of the batch. This method returns the State
object, which we will use to save the state of the batch. This object is used by the aggregate()
and complete()
methods.aggregate(State s, TridentTuple tuple, TridentCollector collector)
: This method iterates over each tuple of the given batch. It also updates the state in the State
object after processing each tuple.complete(State state, TridentCollector tridentCollector)
: This method is called at the end if all tuples of the given batch are processed. This method returns a single tuple corresponding to each batch.The following is an example that shows how we can implement the SumAsAggregator
class using the BaseAggregator
interface:
public static class SumAsAggregator extends BaseAggregator<SumAsAggregator.State> { private static final long serialVersionUID = 1L; // state class static class State { long count = 0; } // Initialize the state public State init(Object batchId, TridentCollector collector) { return new State(); } // Maintain the state of sum into count variable. public void aggregate(State state, TridentTuple tridentTuple, TridentCollector tridentCollector) { state.count = tridentTuple.getLong(0) + state.count; } // return a tuple with single value as output // after processing all the tuples of given batch. public void complete(State state, TridentCollector tridentCollector) { tridentCollector.emit(new Values(state.count)); } }
The CombinerAggregator
interface first runs the partition aggregate on each partition, then runs the global
repartitioning operation to combine all the partitions of the same batch into a single partition, and then reruns the aggregator on the final partition to emit the desired output. The network transfer in the case of the CombinerAggregator
interface is less compared to the other two aggregators. Hence, the overall performance of the CombinerAggregator
interface is better compared to the Aggregator
and ReduceAggregator
interfaces. The CombinerAggregator<T>
interface contains the following methods:
The following example code shows how we can implement the Sum
class using the CombinerAggregator
interface:
public class Sum implements CombinerAggregator<Number> { private static final long serialVersionUID = 1L; public Number init(TridentTuple tridentTuple) { return (Number) tridentTuple.getValue(0); } public Number combine(Number number1, Number number2) { return Numbers.add(number1, number2); } public Number zero() { return 0; } }
The persistent aggregate works on all tuples across all the batches in a stream and persists the aggregate result to the source of the state (Memory, Memcached, Cassandra, or some other database). The following piece of code shows how we can use the persistentAggregate
function:
mystream.persistentAggregate(new MemoryMapState.Factory(), new Fields("select"),new Count(),new Fields("count"));
We will discuss more on this in the Maintaining the topology state with Trident section.
Trident provides us with a feature to apply multiple aggregators on the same input stream, and this process is called aggregator chaining. The following piece of code shows how we can use aggregator chaining:
mystream.chainedAgg().partitionAggregate(new Fields("b"), new Average(), new Fields("average")).partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")).chainEnd();
We have applied the Average()
and Sum()
aggregators on each partition. The output of the chainedAgg()
function contains a single tuple corresponding to each input partition. The output tuple contains two fields, sum
and average
.
The following diagram shows how aggregator chaining works:
3.15.226.79