foldByKey

foldByKey can be considered as reduceByKey with an initial zero value. Just like reduceByKey, it uses an associative function to merge values for each key, but it also uses an initial zero value:

foldByKey(V zeroValue,Function2<V,V,V> func);

As per the Spark documentation, the initial value should be neutral; that is, it may be added to the final result an arbitrary number of times, however it should not affect the final result. For example, 0 in the case of addition, 1 in the case of multiplication, empty list in the case of list concatenation.

The zero value acts as a mutable buffer. It is initialized per key once per partition. Once the key is encountered first in a partition and a zero value is initialized for it, it acts as a mutable buffer. Every value of the key belonging to the partition is merged in a mutable buffer. So, instead of creating a new object every time, as in the case of reduceByKey, this mutable object is used to fold the values of a key. It helps in garbage collection ifthe object creation is expensive.

This value should be a neutral value, but it can be used as quantitative value as well if some value needs to be added to a combined value of every key in every partition.

foldByKey can be used to run a word count as follows:

Java 7:

JavaRDD<String> stringRDD = jsc.parallelize(Arrays.asList("Hello Spark", "Hello Java"));
JavaPairRDD<String, Integer> flatMapToPair = stringRDD.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String t) throws Exception {
List<Tuple2<String,Integer>> list =new ArrayList<>();
for (String s :t.split(" "))
{
list.add(new Tuple2<String, Integer>(s,1));
}
return list.iterator();
}
});
JavaPairRDD<String, Integer> foldByKey = flatMapToPair.foldByKey(0,new
Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});

Java 8:

JavaRDD<String> stringRDD = jsc.parallelize(Arrays.asList("Hello Spark", "Hello Java"));
JavaPairRDD<String, Integer> flatMapToPair = stringRDD.flatMapToPair(s -> Arrays.asList(s.split(" ")).stream() .map(token -> new Tuple2<String, Integer>(token, 1)).collect(Collectors.toList()).iterator());
flatMapToPair.foldByKey(0,(v1, v2) -> v1+v2).collect();
foldByKey also provides following overloaded methods:
foldByKey(V zeroValue,Partitioner partitionerFunction2<V,V,V> func);

Since aggregateByKey involves data shuffling, this method allows user to provide instance of partitioner i.e Hash Partitioner, Merge Partitioner or custom partitioner based on requirement:

foldByKey(V zeroValue,int numPartitions,Function2<V,V,V> func);

Here, user can provide the number of partitions. In this case, foldByKey will create an instance of Hash Partitioner with the number of partitions the user provided and call the preceding variant.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.18.101