Manipulating your RDD in Python

Spark has a more limited API than Java and Scala, but supports most of the core functionalities.

The hallmarks of a MapReduce system are the two commands: map and reduce. You've seen the map function used in the past chapters. The map function works by taking in a function that works on each individual element in the input RDD and produces a new output element. For example, to produce a new RDD where you have added one to every number, you would use rdd.map(lambda x: x+1). It's important to understand that the map function and the other Spark functions do not transform the existing elements, rather they return a new RDD with the new elements. The reduce function takes a function that operates on pairs to combine all the data. This is returned to the calling program. If you were to sum all the elements, you would use rdd.reduce(lambda x, y: x+y).

The flatMap function is a useful utility that allows you to write a function which returns an Iterable object of the type you want and then flattens the results. A simple example of this is a case where you want to parse all the data, but some of the data may not be parsed. The flatMap function can output an empty list if it failed or a list with the success if it worked. In addition to reduce, there is a corresponding reduceByKey function that works on RDDs which are key-value pairs and produces another RDD.

Many of the mapping operations are also defined with a partition variant. In this case, the function you need to provide takes and returns an Iterator object that represents all the data on that partition. This can be quite useful if the operation you need to perform has to do extensive work on each partition, for example, establishing a connection to a backend server.

Often, your data can be expressed with key-value mappings. As such, many of the functions defined on the Python RDD class only work if your data is in a key-value mapping. The mapValues function is used when you only want to update the key-value pair you are working with.

Another variant on the traditional map function is mapPartitions, which works on a per-partition level. The primary reason for using mapPartitions is to create the setup for your map function, which can't be serialized across the network. A good example of this is creating an expensive connection to a backend service or parsing some expensive side input.

def f(iterator):
      //Expensive work goes here
     for i in iterator:
       yield per_element_function(i)

In addition to simple operations on the data, Spark provides support for broadcast and accumulator values. The broadcast values can be used to broadcast a read-only value to all the partitions that can save having to reserialize a given value multiple times. Accumulators allow all the partitions to add to the accumulator and the result can then be read on the master. You can create an accumulator by doing counter = sc.accumulator(initialValue). If you want to add custom behavior, you can also provide an AccumulatorParam as an argument to the accumulator function. The return can then be incremented as counter += x on any of the workers. The resulting value can then be read with counter.value(). The broadcast value is created with bc = sc.broadcast(value) and then accessed by bc.value() by any worker. The accumulator value can only be read on the master and the broadcast value can be read on all the partitions.

Standard RDD functions

The following table explains some of the functions that are available on all RDDs in Python:

Name

Parameters

Purpose

flatMap

(f, preservesPartitioning=False)

Takes a function that returns an Iterator object of type U for each input of type T and returns a flattened RDD of type U.

mapPartitions

(f, preservesPartitioning=False)

Takes a function, and the function takes in an Iterator of type T and returns an Iterator of type U and results in an RDD of type U. So, for example, if we provided a function that took in an iterator of integers and returned an iterator of strings and called it on an RDD of integers we would get back an RDD of strings. Useful for map operations with expensive per machine setup work.

filter

(f)

Takes a function and returns an RDD with only the elements that the function returns true for.

distinct

()

Returns an RDD with distinct elements (for example, entering 1, 1, and 2 will output 1, 2).

union

(other)

Returns a union of two RDDs.

cartesian

(other)

Returns the cartesian product of the RDD with the other RDD.

groupBy

(f, numPartitions=None)

Returns an RDD with the elements grouped together for the value that f outputs.

pipe

(command, env={})

Pipes each element of the RDD to the provided command and returns an RDD of the result.

foreach

f

Applies the function f to each element in the RDD.

reduce

f

Reduces the elements using the provided function.

fold

zeroValue, op

Each partition is folded individually with zeroValue and then the results are folded.

countByValue

()

Returns a dictionary mapping each distinct value to the number of times it is found in the RDD.

take

num

Returns a list of num elements. This can be slow for large values of num, so use collect if you want to get back the entire RDD.

partitionBy

(numPartitions,partitionFunc= hash)

Makes a new RDD partitioned by the provided partitioning function. The partitionFunc argument simply needs to map the input key to the integer space, at which point partionBy takes it mod numPartitions.

PairRDD functions

The following table explains some functions that are only available on key-value pair functions:

Name

Parameters

Purpose

collectAsMap

()

Returns a dictionary consisting of all the key-value pairs of the RDD.

reduceByKey

(func, numPartitions=None)

The reduceByKey function is the parallel version of reduce, which merges the values for each key using the provided function and returns an RDD.

countByKey

()

Returns a dictionary of the number of elements for each key.

join

(other, numPartitions=None)

Joins an RDD with another RDD. The result is only present for elements where the key is present in both RDDs. The value that gets stored for each key is a tuple of the values from each RDD.

rightOuterJoin

(other, numPartitions=None)

Joins an RDD with another RDD. This function outputs a given key-value pair only if the key is present in the RDD being joined with. If the key is not present in the source RDD, the first value in the tuple will be None.

leftOuterJoin

(other, numPartitions=None)

Joins an RDD with another RDD. This function outputs a given key-value pair only if the key is present in the source RDD. If the key is not present in the other RDD, the second value in the tuple will be None.

combineByKey

(createCombiner, mergeValues, mergeCombiners)

Combines elements by keys. This function takes an RDD of type (K,V) and returns an RDD of type (K,C). The argument createCombiner turns something of type V into something of type C, mergeValue adds a V to a C, and mergeCombiners is used to combine two C types into a single C.

groupByKey

(numPartitions=None)

Groups the values in the RDD by the keys they have.

cogroup

(other, numPartitions=None)

Joins two (or more) RDDs by the shared key. Note that if an element is missing in one RDD but present in the other one, the list will simply be empty.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.14.252.56