Resilient Distributed Datasets

The easiest way to create an RDD in Python is with the SparkContext.parallelize method. This method was also used earlier where we parallelized a collection of integers between 0 and 1000:

    rdd = sc.parallelize(range(1000))
# Result:
# PythonRDD[3] at RDD at PythonRDD.scala:48

The rdd collection will be divided into a number of partitions which, in this case, correspond to a default value of four (the default value can be changed using configuration options). To explicitly specify the number of partitions, one can pass an extra argument to parallelize:

    rdd = sc.parallelize(range(1000), 2)
rdd.getNumPartitions() # This function will return the number of partitions
# Result:
# 2

RDDs support a lot of functional programming operators, similar to what we used back in Chapter 6, Implementing Concurrency, with reactive programming and data streams (even though, in that case, the operators were designed to work on events over time rather than normal collections). For example, we may illustrate the basic map function which, by now, should be quite familiar. In the following code, we use map to calculate the square of a series of numbers:

    square_rdd = rdd.map(lambda x: x**2)
# Result:
# PythonRDD[5] at RDD at PythonRDD.scala:48

The map function will return a new RDD but won't compute anything just yet. In order to trigger the execution, you can use the collect method, which will retrieve all the elements in the collection, or take, which will return only the first ten elements:

    square_rdd.collect()
# Result:
# [0, 1, ... ]

square_rdd.take(10)
# Result:
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

For a comparison between PySpark, Dask, and the other parallel programming libraries we explored in the earlier chapters, we will reimplement the approximation of pi. In the PySpark implementation, we will first create two RDDs of random numbers using parallelize, then we combine the datasets using the zip function (this is equivalent to Python's zip), and we finally test whether the random points are inside the circle:

    import numpy as np

N = 10000
x = np.random.uniform(-1, 1, N)
y = np.random.uniform(-1, 1, N)

rdd_x = sc.parallelize(x)
rdd_y = sc.parallelize(y)

hit_test = rdd_x.zip(rdd_y).map(lambda xy: xy[0] ** 2 + xy[1] ** 2 < 1)
pi = 4 * hit_test.sum()/N

It's important to note that both the zip and map operations produce new RDDs and do not actually execute the instruction on the underlying data. In the preceding example, code execution is triggered when we call the hit_test.sum function, which returns an integer. This behavior is different from the Dask API where the whole computation (including the final result, pi) did not trigger the execution.

We can now move on to a more interesting application to demonstrate more RDD methods. We will learn how to count the number of visits each user of a website performs in a day. In a real-world scenario, the data would have been collected in a database and/or stored in a distributed filesystem, such as HDFS. However, in our example, we will generate some data that we will then analyze.

In the following code, we generate a list of dictionaries, each containing a user (selected among twenty users) and a timestamp. The steps to produce the dataset are as follows:

  1. Create a pool of 20 users (the users variable).
  2. Define a function that returns a random time between two dates.
  3. For 10,000 times, we choose a random user from our users pool and a random timestamp between the dates January 1, 2017 and January 7, 2017.
      import datetime

from uuid import uuid4
from random import randrange, choice

# We generate 20 users
n_users = 20
users = [uuid4() for i in range(n_users)]

def random_time(start, end):
'''Return a random timestamp between start date and end
date'''
# We select a number of seconds
total_seconds = (end - start).total_seconds()
return start +
datetime.timedelta(seconds=randrange(total_seconds))

start = datetime.datetime(2017, 1, 1)
end = datetime.datetime(2017, 1, 7)

entries = []
N = 10000
for i in range(N):
entries.append({
'user': choice(users),
'timestamp': random_time(start, end)
})

With the dataset at hand, we can start asking questions and use PySpark to find the answers. One common question is "How many times has a given user visited the website?." A naive way to compute this result can be achieved by grouping the entries RDD by user (using the groupBy operator) and counting how many items are present for each user. In PySpark, groupBy takes a function as argument to extract the grouping key for each element and returns a new RDD that contain tuples of the (key, group) form. In the following example, we use the user ID as the key for our groupBy, and we inspect the first element using first:

    entries_rdd = sc.parallelize(entries)
entries_rdd.groupBy(lambda x: x['user']).first()
# Result:
# (UUID('0604aab5-c7ba-4d5b-b1e0-16091052fb11'),
# <pyspark.resultiterable.ResultIterable at 0x7faced4cd0b8>)

The return value of groupBy contains a ResultIterable (which is basically a list) for each user ID. To count the number of visits per user, it's sufficient to calculate the length of each ResultIterable:

    (entries_rdd
.groupBy(lambda x: x['user'])
.map(lambda kv: (kv[0], len(kv[1])))
.take(5))
# Result:
# [(UUID('0604aab5-c7ba-4d5b-b1e0-16091052fb11'), 536),
# (UUID('d72c81c1-83f9-4b3c-a21a-788736c9b2ea'), 504),
# (UUID('e2e125fa-8984-4a9a-9ca1-b0620b113cdb'), 498),
# (UUID('b90acaf9-f279-430d-854f-5df74432dd52'), 561),
# (UUID('00d7be53-22c3-43cf-ace7-974689e9d54b'), 466)]

Even though this algorithm may work well in small datasets, groupBy requires us to collect and store the whole set of entries for each user in memory, and this can exceed the memory capacity of an individual node. Since we don't need the list but only the count, there's a better way to calculate this number without having to hold the list of visits for each user in memory.

When dealing with an RDD of (key, value) pairs, it is possible to use mapValues to apply a function only to the values. In the preceding code, we can replace the map(lambda kv: (kv[0], len(kv[1]))) call with mapValues(len) for better readability.

For a more efficient calculation, we can leverage the reduceByKey function, which will perform a step similar to the Reduce step that we saw in the An introduction to MapReduce section. The reduceByKey function can be called from an RDD of tuples that contain a key as their first element and a value as their second element, and accepts a function as its first argument that will calculate the reduction.  A simple example of the reduceByKey function is illustrated in the following snippet. We have a few string keys associated with integer numbers, and we want to get the sum of the values for each key; the reduction, expressed as a lambda, corresponds to the sum of the elements:

    rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4), ("c", 5)])
rdd.reduceByKey(lambda a, b: a + b).collect()
# Result:
# [('c', 5), ('b', 6), ('a', 4)]

The reduceByKey function is much more efficient than groupBy because the reduction is parallelizable and doesn't require the in-memory storage of the groups; also, it limits the data shuffled between Executors (it performs similar operations to Dask's foldby, which was explained earlier). At this point, we can rewrite our visit count calculation using reduceByKey:

    (entries_rdd
.map(lambda x: (x['user'], 1))
.reduceByKey(lambda a, b: a + b)
.take(3))
# Result:
# [(UUID('0604aab5-c7ba-4d5b-b1e0-16091052fb11'), 536),
# (UUID('d72c81c1-83f9-4b3c-a21a-788736c9b2ea'), 504),
# (UUID('e2e125fa-8984-4a9a-9ca1-b0620b113cdb'), 498)]

With Spark's RDD API, it is also easy to answer questions such as "How many visits did the website receive each day?." This can be computed using reduceByKey with the appropriate key (which is the date extracted from the timestamp). In the following example, we demonstrate the calculation. Also, note the usage of the sortByKey operator to return the counts sorted by date:

    (entries_rdd
.map(lambda x: (x['timestamp'].date(), 1))
.reduceByKey(lambda a, b: a + b)
.sortByKey()
.collect())
# Result:
# [(datetime.date(2017, 1, 1), 1685),
# (datetime.date(2017, 1, 2), 1625),
# (datetime.date(2017, 1, 3), 1663),
# (datetime.date(2017, 1, 4), 1643),
# (datetime.date(2017, 1, 5), 1731),
# (datetime.date(2017, 1, 6), 1653)]
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.216.77.153