Accumulators

Accumulators are shared variables across executors typically used to add counters to your Spark program. If you have a Spark program and would like to know errors or total records processed or both, you can do it in two ways. One way is to add extra logic to just count errors or total records, which becomes complicated when handling all possible computations. The other way is to leave the logic and code flow fairly intact and add Accumulators.

Accumulators can only be updated by adding to the value.

The following is an example of creating and using a long Accumulator using Spark Context and the longAccumulator function to initialize a newly created accumulator variable to zero. As the accumulator is used inside the map transformation, the Accumulator is incremented. At the end of the operation, the Accumulator holds a value of 351.

scala> val acc1 = sc.longAccumulator("acc1")
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355, name: Some(acc1), value: 0)

scala> val someRDD = statesPopulationRDD.map(x => {acc1.add(1); x})
someRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[99] at map at <console>:29

scala> acc1.value
res156: Long = 0 /*there has been no action on the RDD so accumulator did not get incremented*/

scala> someRDD.count
res157: Long = 351

scala> acc1.value
res158: Long = 351

scala> acc1
res145: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355, name: Some(acc1), value: 351)

There are inbuilt accumulators which can be used for many use cases:

  • LongAccumulator: for computing sum, count, and average of 64-bit integers
  • DoubleAccumulator: for computing sum, count, and averages for double precision floating numbers.
  • CollectionAccumulator[T] : for collecting a list of elements

All the preceding Accumulators are built on top of the AccumulatorV2 class. By following the same logic, we can potentially build very complex and customized Accumulators to use in our project.

We can build a custom accumulator by extending the AccumulatorV2 class. The following is an example showing the necessary functions to implement. AccumulatorV2[Int, Int] shown in the following code means that the Input and Output are both of Integer type:

class MyAccumulator extends AccumulatorV2[Int, Int] {
//simple boolean check
override def isZero: Boolean = ???

//function to copy one Accumulator and create another one
override def copy(): AccumulatorV2[Int, Int] = ???

//to reset the value
override def reset(): Unit = ???

//function to add a value to the accumulator
override def add(v: Int): Unit = ???

//logic to merge two accumulators
override def merge(other: AccumulatorV2[Int, Int]): Unit = ???

//the function which returns the value of the accumulator
override def value: Int = ???
}

Next, we will look at a practical example of a custom accumulator. Again, we shall use the statesPopulation CSV file for this. Our goal is to accumulate the sum of year and sum of population in a custom accumulator.

Step 1. Import the package containing the AccumulatorV2 class:

import org.apache.spark.util.AccumulatorV2

Step 2. Case class to contain the Year and Population:

case class YearPopulation(year: Int, population: Long)

Step 3. StateAccumulator class extends AccumulatorV2:


class StateAccumulator extends AccumulatorV2[YearPopulation, YearPopulation] {
//declare the two variables one Int for year and Long for population
private var year = 0
private var population:Long = 0L

//return iszero if year and population are zero
override def isZero: Boolean = year == 0 && population == 0L

//copy accumulator and return a new accumulator
override def copy(): StateAccumulator = {
val newAcc = new StateAccumulator
newAcc.year = this.year
newAcc.population = this.population
newAcc
}

//reset the year and population to zero
override def reset(): Unit = { year = 0 ; population = 0L }

//add a value to the accumulator
override def add(v: YearPopulation): Unit = {
year += v.year
population += v.population
}

//merge two accumulators
override def merge(other: AccumulatorV2[YearPopulation, YearPopulation]): Unit = {
other match {
case o: StateAccumulator => {
year += o.year
population += o.population
}
case _ =>
}
}


//function called by Spark to access the value of accumulator
override def value: YearPopulation = YearPopulation(year, population)
}

Step 4. Create a new StateAccumulator and register the same with SparkContext:

val statePopAcc = new StateAccumulator

sc.register(statePopAcc, "statePopAcc")

Step 5. Read the statesPopulation.csv as an RDD:


val statesPopulationRDD = sc.textFile("statesPopulation.csv").filter(_.split(",")(0) != "State")

scala> statesPopulationRDD.take(10)
res1: Array[String] = Array(Alabama,2010,4785492, Alaska,2010,714031, Arizona,2010,6408312, Arkansas,2010,2921995, California,2010,37332685, Colorado,2010,5048644, Delaware,2010,899816, District of Columbia,2010,605183, Florida,2010,18849098, Georgia,2010,9713521)

Step 6. Use the StateAccumulator:

statesPopulationRDD.map(x => { 
val toks = x.split(",")
val year = toks(1).toInt
val pop = toks(2).toLong
statePopAcc.add(YearPopulation(year, pop))
x
}).count

Step 7. Now, we can examine the value of the StateAccumulator:

scala> statePopAcc
res2: StateAccumulator = StateAccumulator(id: 0, name: Some(statePopAcc), value: YearPopulation(704550,2188669780))

In this section, we examined accumulators and how to build a custom accumulator. Thus, using the preceding illustrated example, you can create complex accumulators to meet your needs.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.173.227