Accumulators

Accumulators are shared variables which are used for aggregating values from executor nodes to driver node. Some characteristics of accumulators are:

  1. Accumulators are write-only shared variables.
  2. Operations that are associative and cumulative in nature are only supported by Accumulators.
  3. The executor nodes do not have access to the accumulator's value but can only aggregate to the value.
  4. The driver program has access to the accumulator's final value.

Although accumulators can be used both in transformation and action, Spark guarantees that it is only action in which accumulators will be run once. Accumulators used in a transformation could have duplicate values if there are any task failures or speculative execution, however rare that may be.

Spark has some built-in accumulators, such as longAccumulator(), doubleAccumulator(), and collectionAccumulator(), to perform basic aggregation over standards data types. Accumulators can also be understood in relation to what counters stood for in MapReduce. Let's work through the following example to understand accumulators. In our use case, we have an application log file with multiple exception entries and, depending on the count of occurrence of exceptions, a certain predefined task can be called. Now to approach such a problem we can use multiple techniques, and using accumulator can be one such way to solve it:

LongAccumulator longAccumulator = sparkContext.sc().longAccumulator("ExceptionCounter");
JavaRDD<String> textFile = sparkContext.textFile("src/main/resources/logFileWithException.log");
textFile.foreach(newVoidFunction<String>() {
@Override
public void call(String line) throws Exception {
if(line.contains("Exception")){
longAccumulator.add(1);
System.out.println("The intermediate value in loop
"+longAccumulator.value());
}
}
});
System.out.println("The final value of Accumulator :
"+longAccumulator.value());

While longAccumulator and doubleAccumulator return long and double respectively, the collectionAccumulator() returns a list of added elements. So, we repeat our previous use case and use a collectionAccumulator() instead of longAccumulator(), and we get a list of elements each representing the added element:

CollectionAccumulator<Long> collectionAccumulator = sparkContext.sc().collectionAccumulator();
textFile.foreach(newVoidFunction<String>() {
@Override
publicvoid call(String line) throws Exception {
if(line.contains("Exception")){
collectionAccumulator.add(1L);
System.out.println("The intermediate value in loop
"+collectionAccumulator.value());
}
}
});
System.out.println("The final value of Accumulator : "+collectionAccumulator.value());

Custom accumulators can also be written by sub-classing the AccumulatorV2 class. AccumulatorV2 is an abstract class that accumulates inputs of type IN, and produces output of type OUT. OUT should be a type that can be read atomically (for example, int, long), or thread-safely (for example, synchronized collections) because it will be read from other threads. Unlike built-in accumulators, custom accumulators need to be registered in SparkContext register() method and only then will they behave like a shared variable. Also there is a helper method isRegistered() in AccumulatorV2 class that returns a Boolean value to check if the custom accumulator has been registered or not. Let's reuse our previous use case again and build a custom accumulator that accepts a string value but returns a list of integers:

//Custom Accumulator
public class ListAccumulator extends AccumulatorV2<String, CopyOnWriteArrayList<Integer>> {
private static final long serialVersionUID = 1L;
private CopyOnWriteArrayList<Integer> accList = null;
public ListAccumulator() {
accList = new CopyOnWriteArrayList<Integer>();
}
public ListAccumulator(CopyOnWriteArrayList<Integer> value) {
if (value.size() != 0) {
accList = new CopyOnWriteArrayList<Integer>(value);
}
}
@Override
public void add(String arg) {
if (!arg.isEmpty())
accList.add(Integer.parseInt(arg));
}
@Override
public AccumulatorV2<String, CopyOnWriteArrayList<Integer>> copy() {
return new ListAccumulator(value());
}
@Override
public boolean isZero() {
return accList.size() == 0 ? true : false;
}
@Override
public void merge(AccumulatorV2<String, CopyOnWriteArrayList<Integer>>
other)
{
add(other.value());
}
private void add(CopyOnWriteArrayList<Integer> value) {
value().addAll(value);
}
@Override
public void reset() {
accList = new CopyOnWriteArrayList<Integer>();
}
@Override
public CopyOnWriteArrayList<Integer> value() {
return accList;
}
}
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.179.100