Chapter 2. Data Design Patterns

The goal of this chapter is to itroduce some informal but practical data design patterns. I will only discuss the design patterns that are useful in production environments and will not look at the theoretical design patterns (which are just theories and not used in big data solutions), which are not deployed in production environments.

This chapter

  • Introduces the basic concepts of design patterns in the context of big data

  • Provides some useful and practical design patterns by examples

  • Shows how to use Spark’s transformations for implementing data design patterns

  • Introduces the concept of “monoids” for better understandings of reduction transformations

The best design patterns book is the iconic computer science book “Design Patterns: “Elements of Reusable Object-Oriented Software” by four authors: Erich Gamma, Richard Helm, Ralph Johnson, and John Vlissides (known as The “Gang of Four”). I will not present the data design patterns similar to the “Gang of Four” book, but will focus on practical data design patterns. My presentations are informal, practiacl, and have benn used in production environments.

What is a design pattern? According to wikipedia: “in software engineering, a design pattern is a general reusable solution to a commonly occurring problem within a given context in software design. A design pattern is not a finished design that can be transformed directly into source or machine code.”

Spark, MapReduce, and distributed programming design patterns are common patterns in data related problem solving. MapReduce is a programming model (for example, Hadoop is a concrete implementation of MapReduce) and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. A typical MapReduce job consists of a driver and 3 functions:

  • map(): a mapper, which supports an equivalent of Spark’s map(), flatMap(), and filter()

  • reduce(): a reducer , which supports an equivalent of Spark’s reduceByKey() and groupByKey()

  • combine() as an optional local reducer

For example, in software engineering, the “singleton” pattern is a design pattern that restricts the instantiation of a class to one object. In MapReduce, InMapper Combiner is a design pattern, which does most of the combine() functionality (in order to avoid generating too many intermediate (key, value) pairs) inside the mappers. A Combiner transformation, also known as a semi-reducer, is an optional operation in MapReduce paradigm that operates by accepting the inputs from the mappers and thereafter passing the output key-value pairs to the reducers. The main function of a Combiner is to summarize the mappers outputs with the same key. Another benefit of design patterns is that it makes the objective of code easier to understand and provides known scalability issues (if any), performance profiles and limitations of solutions.

To be practical, I will only focus on some of the practical data design patters, which can help us on developing big data solutions, which can be deployed to production environments without scalability problems. The data design patters are patterns, which can help us to write scalable solutions to be deployed on Spark clusters. Overall, there is no silver bullet on adopting and using design patterns and data design patterns, every adopted pattern should be tested (in similar production environments) in detail for performance and scalability by using real data.

For learning design patterns in software engineering, you should look at the “Gang of Four” book (Design Patterns: Elements of Reusable Object-Oriented Software by Erich Gamma, et al). For learning design patterns in MapReduce, you should look at the “MapReduce Design Patterns” book by Donald Miner and Adam Shook and my book “Data Algorithms” (published by O’Reilly).

Below are the partial list of data design patterns and I will cover some of them in this chapter.

  • MinMax

  • Top-10

  • Secondary Sorting

  • Stripes

  • InMapper Combining

  • Join patterns (inner-join, left-join, …)

  • Summarization Patterns

  • Filtering Patterns

  • Data Organization Patterns

  • Duplicate Detection

Next, I discuss the InMapper Combining data design pattern by some simple examples.

InMapper Combining

InMappper Combining refers to a situation for a mapper to combine and summarize the mappers outputs as much as possible and emit less intermediate (key, value) pairs for the sort and shuffle and reducers (such as reduceByKey(), groupByKey(), …). For example, for a classic word count problem, given an input record such as:

"fox jumped and fox jumped again fox"

Without using InMappper Combiner, we will generate the following (key, value) pairs (before handing it to shufflers and then reducers):

(fox, 1)
(jumped, 1)
(and, 1)
(fox, 1)
(jumped, 1)
(again, 1)
(fox, 1)

The problem is that for big data, we might generate too many (word, 1) pairs, which might be keeping the cluster network too busy and may be less efficient. Using the InMapper Combiner data design pattern, we will combine (summarize the mappers outputs) the (key, value) pairs to generate less (key, value) pairs. For example since there are 3 of (fox, 1), then that will be combined into (fox, 3).

(fox, 3)
(jumped, 2)
(and, 1)
(again, 1)

The basic concept of the “InMapper Combiner” data design pattern is illustrated by Figure 19.1.

inmapper combiner example 1
Figure 2-1. InMapper Combiner Concept

Therefore, in the mapper transformation, using InMapper combiner, the 3 pairs of (fox, 1) is replaced by (fox, 3). Without using InMapper combiner, for the word count, we generated seven (key, value) pairs, but by using the InMapper combiner we generated only four (key, value) pairs. If we have a lot of repeated words in our data, then the InMapper Combiner might help us to achieve better performance by generating less intermediate (key, value) pairs.

To demonstrate the InMapper Combining data Design Pattern concepts, we solve counting the frequencies of characters for a set of documents. In simple terms, we want to find out the frequency of each unique character for a given corpus. We will discuss the following solutions:

  • Basic MapReduce Design Pattern

  • InMapper Combining Per Record

  • InMapper Combining Per Partition

Basic MapReduce Design Pattern

To count the characters, for each record of input, we split it into a set of words, and then we split each word into a character array, and finally we emit a (key, value) pairs, where key is a single character from a character array and value is 1 (frequency count of one character). Since we did not use any custom data types for emitting key-value pairs, we call this the “Basic” MapReduce design pattern. The reducer sums up the frequencies of a single unique character. The problem with this solution is that for large data, we emit too many (key, value) pairs, which can impact the network traffic and hence can impact the performance of the overall solution. Also, because of too many (key, value) emitted pairs, the “sort and shuffle” (grouping values for the same key) phase might take longer time than expected.

char count mr algorithm 1
Figure 2-2. Character Count: Basic MapReduce Algorithm

Given an RDD of String (as RDD[String]), PySpark solution is provided below:

First we define a simple function, which accepts a single string records and then returns a list of (key, value) pairs, where key is a character and value is 1 (frequency of a character).

def mapper(rec):
    words = rec.lower().split()  1
    pairs = []   2
    for word in words:   3
        for c in word:   4
           pairs.append((c, 1))   5
        #end-for
    #end-for
    return pairs  6
#end-def
1

Tokenize a record into an array of words

2

Create an empty list as pairs

3

Iterate over words

4

Iterate over a single word

5

Add each character (c) as (c, 1) to pairs

6

Return list of (c, 1) for all characters in a given record

The mapper() function can be simplified as:

def mapper(rec):
    words = rec.lower().split()
    pairs = [(c, 1) for word in words for c in word]
    return pairs
#end-def

Next, we use the mapper() function to count frequencies of unique characters:

# spark : an instance of SparkSession
rdd = spark.sparkContext.textFile("/dir/input")  1
pairs = rdd.flatMap(mapper)  2
frequencies = pairs.reduceByKey(lambda a, b: a+b)  3
1

Create an RDD[String] from input data

2

Map each record into collection of characters and flatten it as a new RDD[Character, 1]

3

Find frequencies of each unique character

Therefore, InMapper Combining data design pattern reduces the number of (key, value) pairs emitted by mappers, which indeed this reduces the network traffic and improves the performance of execution time.

InMapper Combining Per Record

This section introduces InMapper Combining Per Record design pattern, which is also known as Local Aggregation Per Record solution. This is very similar to the basic Spark’s MapReduce Algorithm with the exception that for a given input record, we aggregate frequencies for a single character before emitting (key, value) pairs. Since the same character may be repeated many times in a given input record, in this solution we emit (key, value) pairs where key is a unique single character within a given record and value is an aggregated frequency for the same character in a given mapper input record. Then we use reduceByKey() to aggregate all frequencies for a unique single character. This solution uses “local aggregation” by leveraging the associativity and commutativity of the reduce() function to combine values before reducers fetch them across the network. Typically, in MapReduce frameworks (such as Hadoop), combiners are viewed by the runtime as an optional local optimizations. Note that the correctness of the algorithm cannot depend on the combiner (instead we use “force in-mapper combining”). For example, for the following input record:

foxy fox jumped over fence

we will emit the following (key, value) pairs:

(o, 3)  (m, 1)  (v, 1)
(x, 2)  (p, 1)  (r, 1)
(y, 1)  (e, 4)  (n, 1)
(j, 1)  (d, 1)  (c, 1)

Given an RDD of String (RDD[String]), PySpark solution is provided below:

First we define a simple function, which accepts a single string record (an element of input RDD[String]) and then returns a list of (key, value) pairs, where key is a unique character and value is an aggregated frequency of that character.

import collections  1
def local_aggregator(record):
    hashmap = collections.defaultdict(int)   2
    words = record.lower().split()  3
    for word in words:   4
        for c in word:   5
            hashmap[c] += 1  6
        #end-for
    #end-for
    print("hashmap=", hashmap)
#
    pairs = [(k, v) for k, v in hashmap.iteritems()]  7
    print("pairs=", pairs)
    return  pairs  8
#end-def
1

The collections module provides high-performance container datatypes

2

Create an empty dictionary[String, Integer], defaultdict is a dict subclass that calls a factory function to supply missing values

3

Tokenize record into an array of words

4

Iterate over words

5

Iterate over each word

6

Aggregate characters

7

Flatten dictionary into a list of (character, frequency)

8

Return the flattened list of (character, frequency)

Next, we use the local_aggregator() function to count frequencies of unique characters:

input_path = '/tmp/your_input_data.txt'
rdd = spark.sparkContext.textFile(input_path)
pairs = rdd.flatMap(local_aggregator)
frequencies = pairs.reduceByKey(lambda a, b: a+b)

For sure, this solution will emit much less (key, value) pairs, which is an improvement over the previous solution. The problem with this solution is that we instantiate and use a dictionary per mapper. If we have so many mappers, this might create an OOM problem. But if the number of mappers are not too many, then this solution will scale out. Another improvement of this solution is in the “sort and shuffle” phase: since we do not have too many (key, value) pairs before reduction, then the sort and shuffle will execute faster than the previous solution.

Next, I present InMapper Combiner Per Partition, which is the most efficient among all InMapper combiners. This is because we use the least amount of memory to summaraize and combine (key, value) pairs per partition (can be comprised of millions of data elements).

InMapper Combiner Per Partition

InMapper Combiner Per Partition solution emits frequencies of characters per partition (rather than per record) of input data. Each partition of input is a set of input records rather than a single input record. Therefore, there is another way to emit the frequency of characters: build a hash table of dict[Character,Integer] from the characters of given “input partition” (as opposed to “input record" — an “input partition” is a partition of input comprised of a set of “input records”) and then emit the (key, value) pairs, where key is dict.Entry.getKey() and value is dict.Entry.getValue(). Therefore, the mapper phase will emit a set of (key, value) pairs, where key is an entry of built hash table.

In this of solution, we use a single hash table per input partition (rather than per input record) to keep track of frequencies of all characters for a given partition. After the mapper (using the PySpark’s mapPartitions() transformation) completes the given partition, then we emit all key-value pairs from the frequencies table (the built hash table). Then the reducers will sum up the frequencies and find the final count of characters. Note that the mapper using Hash Table (called the “InMapper Combiner” design pattern) is more efficient than the presented previous solutions because it will not emit too many key-value pairs, which improve efficiency in network by sending less data. The solution will scale out better than the previous two solutions: we use only a single hash table per input partitioner rather than using a hash table per input record. This eliminates possible OOM problems. If even we partition our input into thousands of partitions, this solution scales out very well.

It is clear that the “Basic” MapReduce algorithm generates huge number of key-value pairs compared to the “InMapper Combining” design pattern. The “InMapper Combining” data structure representation is much more compact, since every entry of dict[Character, Integer] is equivalent to N basic key-value pairs, where N is equal to dict.Entry.getValue(). The “InMapper Combining” approach also generates fewer and shorter intermediate keys, and therefore the execution framework has less sorting to perform. In using the “InMapper Combining” design pattern, you need to be careful on the size of hash table to make sure it will not cause bottlenecks. For Character Count problem, the size of hash table for each mapper (per input partition) will be very small (since we have a limited number of unique characters), therefore there is no performance bottleneck for the presented solution.

For example, for the following “input partition” (as opposed to a single record):

foxy fox jumped over fence
foxy fox jumped
foxy fox

we will emit the following (key-value) pairs:

(f, 7)  (u, 2)  (v, 1)  (j, 2)  (y, 3)
(o, 7)  (m, 1)  (r, 1)  (d, 2)  (e, 5)
(x, 6)  (p, 2)  (n, 1)  (c, 1)

Given an RDD of String (RDD[String]), PySpark solution is provided below:

First we define a simple function, which accepts a single input partition (comprised of many input records) and then returns a list of (key, value) pairs, where key is a character and value is an aggregated frequency of that character.

def inmapper_combiner(partition_iterator):  1
    hashmap = defaultdict(int)   2
    for record in partition_iterator:  3
        words = record.lower().split()  4
        for word in words:   5
            for c in word:   6
                hashmap[c] += 1  7
            #end-for
        #end-for
    #end-for
    print("hashmap=", hashmap)
    #
    pairs = [(k, v) for k, v in hashmap.iteritems()]  8
    print("pairs=", pairs)
    return  pairs  9
#end-def
1

partition_iterator represents a single input partition comprised of a set of records

2

Create an empty dictionary[String, Integer]

3

Get a single record from a partition

4

Tokenize record into an array of words

5

Iterate over words

6

Iterate over each word

7

Aggregate characters

8

Flatten dictionary into a list of (character, frequency)

9

Return the flattened list of (character, frequency)

Next, we use the inmapper_combiner() function to count frequencies of unique characters:

rdd = spark.sparkContext.textFile("/.../input")   1
pairs = rdd.mapPartitions(inmapper_combiner)   2
frequencies = pairs.reduceByKey(lambda a, b: a+b)
1

Create an RDD[String] from input file(s)

2

The `mapPartitions() transformation returns a new RDD by applying a function to each input partition (as opposed to a single input record) of this RDD

For sure, this solution will emit much less (key, value) pairs, which is an improvement over previous two solutions. This solution is very efficient since we instantiate and use a single dictionary per input partition (rather than per input record). If even we have so many mappers, this will not create an OOM problem. No matter the number of input partitions, this solution will scale out. Another improvement of this solution is in the “sort and shuffle” phase: since we do not have too many (key, value) pairs before reduction, then the sort and shuffle will execute much faster than the previous two solutions. The “InMapper Combining” design pattern reduces the amount of data that needs to be transferred between the mapper and reducer. However, the “InMapper Combining” algorithm may runs into a problem if the number of unique keys grows too large for the associative array to fit in memory. If this is the case you will have to revert to the basic Pairs design pattern approach. Note that with “InMapper Combining”, the mappers will generate only those key-value pairs that need to be shuffled across the network to the reducers.

In implementing the “InMapper Combining” design pattern, we used Spark’s powerful mapPartitions() transformation to transform each input partition into a single dict[Character, Integer] and then we aggregate/merge these into a single final dict[Character, Integer]. For “char counting”, this algorithm is efficient and faster than other algorithms. When you are to create or aggregate small amount of information from large chunk of data (such as finding frequencies of characters — since the number of ASCII characters are very limited), then using “InMapper Combining” design pattern makes sense and a best option to implement this design pattern is the Spark’s mapPartitions() transformation.

We should also consider the scalability bottlenecks for the “InMapper Combining” design pattern. Using the “InMapper Combining” design pattern, we make the assumption that, at any point in time, each associative array per mapper partition (i.e., dic[Character, Integer]) is small enough to fit into memory — otherwise, memory paging will significantly impact performance. For character count, the size of the associative array (per mapper partition) is bounded by the number of unique characters. Therefore, for character count problem, there is no scalability bottlenecks by using the “InMapper Combining” design pattern.

But what are the advantages and disadvantages of the InMapper Combining design pattern? The InMapper Combining has the following advantages and disadvantages:

  • Advantages

    • Create less (key, value) pairs by mappers

    • Far less sorting and shuffling of (key, value) pairs

    • Can make better use of combiners as optimizers

    • Scale out solution

  • Disadvantages

    • More difficult to implement (you need some custom functions for handling each partition)

    • Underlying object (per mapper partition) is more heavyweight

    • Fundamental limitation in terms of size of the underlying object (for the character count problem: an associative array per mapper partition)

The InMapper Combining design pattern enable us to reduce the number of (key, value) pairs emitted by mappers. Hence, this can improve the performance of your entire data transformations.

Top-10

The Top-10 design-pattern is illustrated by the Figure 19.3.

top 10 mappers reducers
Figure 2-3. Top-10 Design Pattern

Creating a Top-10 list is a common task in many data intensive operations. For example, we might ask the following questions:

  • What are the top-10 list of the URLs visited last day/week/month?

  • What are the top-10 list of items purchased from Amazon last week/month?

  • What are the top-10 list of search queries from Google in the last day/week/month?

  • What are the top-10 list of liked items from Facebook yesterday?

  • What are the top-10 list of cartoons of all time?

For example, for a search engine (such as Google), it is desirable to know the Top-10 visited URLs per day, week, or month. If we have a table of two columns: (url, frequency), then finding Top-10 URLs in SQL is straightforward:

  SELECT url, frequencey
     FROM url_table
        ORDER BY frequencey DESC
           LIMIT 10;

Also, finding Top-N (where N > 0) records in Spark is very easy: let R be an RDD of (String, Integer) (pair of String and Integer), where key is a String (representing a URL) and value is the frequency of URL. Then we may use RDD.takeOrdered(N) to find the Top-N lists. The general format of RDD.takeOrdered() is given below:

takeOrdered(N, key=None)
Description:
   Get the N elements from an RDD ordered in ascending
   order or as specified by the optional key function.

Assuming that N is an integer greater than 0, using RDD.takeOrdered() solves the Top-N list as:

N = 10

# Sort by keys (ascending):
RDD.takeOrdered(N, key = lambda x: x[0])

# Sort by keys (descending):
RDD.takeOrdered(N, key = lambda x: -x[0])

# Sort by values (ascending):
RDD.takeOrdered(N, key = lambda x: x[1])

# Sort by values (descending):
RDD.takeOrdered(N, key = lambda x: -x[1])

For sake of argument assume that takeOrdered() does not have an optimal performance for a very large data set. Then what other options do we have? We might use the Top-10 design pattern for MapReduce paradigm. How does the Top-10 design pattern work for MapReduce paradigm? We’ll discuss that throughout the rest of this section.

Given a large set of { (key-as-string, value-as-integer) } pairs, then finding a Top-N ( where N > 0) list is a “design pattern”, which is an independent reusable solution to a common Top-10 problem, which enable us to produce reusable code. For example, let key-as-string be a URL and value-as-integer be the number of times that URL is visited, then you might ask: what are the Top-10 URLs for a given data (means find the 10 URLs with the highest frequencies)? This kind of a question is common for this type of (key, value) pairs. Finding “Top-10 list” falls into “Filtering Pattern”: you filter out data and find Top-10 list. Also note that Top-10 function is a function which is commutative and associative and therefore using partitioners, combiners, and reducers will always produce correct results. Let T be a Top-10 function, and let a, b, and c be a set of values (such as frequencies) for the same key, then we can write:

  • Commutative

   T(a, b) = T(b, a)
  • Associative

   T(a, T(B, c)) = T(T(a, b), c)

This section provides a complete PySpark solution for Top-10 design pattern. Given a pair RDD of (String, Integer), the goal is to find Top-10 list for the given RDD. In our solution, we assume that all keys are unique. If the keys are not unique, then you may use the reduceByKey() (before finding Top-10) to make all keys unique.

Our MapReduce solutions will generalize the “top 10 list” and will be able to find “top-N list” (for N > 0). For example, we will be able to find “top 10 cats”, “top 50 most visited web sites”, or “top 100 search queries of a search engine”.

Top-N Formalized

Let N be an integer number and N > 0. Let L be a list of pairs of (T, Integer), where T can be any type (such as String, URL, …), L.size() = s, s > N, and elements of L be:

topn 001

where K~i~ has a data type of T and V~i~ is an Integer type (this is the frequency of K~i~). Let {sort(L)} be a sorted list where sort is done by using frequency as a key:

topn 002

where (A~j~, B~j~) in L. Then top-N of list L is defined as:

topn 003

For Top-N solution, we will use Python’s SortedDict. Before providing Top-N solution, let’s understand SortedDict. Python’s sorted dict (SortedDict) is a sorted mutable mapping. Sorted dict keys are maintained in sorted order. The design of sorted dict is simple: sorted dict inherits from dict to store items and maintains a sorted list of keys. Sorted dict keys must be hashable and comparable. The hash and total ordering of keys must not change while they are stored in the sorted dict.

To implement Top-N, we need a hash table data structure such as sortedcontainers.SortedDict() that we can have a total order on its keys (keys represent frequencies). The easy way to build a Top-N list in Python is to use SortedDict(), a dictionary that further provides a total ordering on its keys. The dictionary is ordered according to the natural ordering of its keys. We will keep adding (frequency, url) to the SortedDict(), but keep its size at N (note that frequency is the key since SortedDict() entries are sorted by its key). When the size is N+1, we will pop out the smallest frequency by SortedDict.popitem(0).

Example and usage of SortedDict() is given below:

>>> from sortedcontainers import SortedDict
>>> sd = SortedDict({10: 'a', 2: 'm', 3: 'z', 5: 'b', 6: 't', 100: 'd', 20: 's'})
>>> sd
SortedDict({2: 'm', 3: 'z', 5: 'b', 6: 't', 10: 'a', 20: 's', 100: 'd'})
>>> sd.popitem(0)
(2, 'm')
>>> sd
SortedDict({3: 'z', 5: 'b', 6: 't', 10: 'a', 20: 's', 100: 'd'})
>>> sd[50] = 'g'
>>> sd
SortedDict({3: 'z', 5: 'b', 6: 't', 10: 'a', 20: 's', 50: 'g', 100: 'd'})
>>> sd.popitem(0)
(3, 'z')
>>> sd
SortedDict({5: 'b', 6: 't', 10: 'a', 20: 's', 50: 'g', 100: 'd'})
>>> sd[9] = 'h'
>>> sd
SortedDict({5: 'b', 6: 't', 9: 'h', 10: 'a', 20: 's', 50: 'g', 100: 'd'})
>>> sd.popitem(0)
(5, 'b')
>>> sd
SortedDict({6: 't', 9: 'h', 10: 'a', 20: 's', 50: 'g', 100: 'd'})
>>>
>>> len(sd)
6

Next, I present a Top-10 solution using PySpark.

PySpark Solution

The PySpark solution is pretty straightforward: we partition the RDD into M partitions (where M > 1) by using the mapPartitions() transformation. Each mapper will find a local “top-N” list (for N > 0) and then will pass it to a single reducer. Then the single reducer will find the final “top-N” list from all local “top-N” list passed from mappers. In general, in most of the MapReduce algorithms, having a single reducer is problematic and will cause a performance bottleneck (the reason for bottleneck is that one reducer in one server receives all data — which can be very big data volume  — and all other cluster nodes do nothing, so the entire pressure and load will be on a single node, which can cause performance bottlenecks). Here, our single reducer will not cause a performance problem. Here is how: let’s assume that we will have 5,000 partitions, then each mapper will only generate 10 (key, value) pairs. Therefore, our single reducer will only get 50,000 records (which is not that much data at all to cause performance bottleneck!).

Our high level solution for Top-10 Design Pattern is illustrated by Figure 19.3.

top 10 design pattern
Figure 2-4. High Level Solution for Top-10 Design Pattern

The Top-10 algorithm is presented below. Input is partitioned into smaller chunks and each chunk is sent to a mapper. Each mapper creates a local top-10 list and then emits the local top-10 to be sent to reducers. In emitting mappers output, we use a single reducer key so that all mappers output will be consumed by a single reducer.

top10 1
Figure 2-5. Top-10 MapReduce Algorithm: for Unique Keys

Let spark be an instance of a SparkSession. This is how the Top-10 is solved by using the mapPartitions() transformation and a custom Python function named top10_design_pattern():

pairs = [('url-1', 10), ('url-2', 8), ('url-3', 4), ...]
rdd = spark.sparkContext.parallelize(pairs)
# mapPartitions(f, preservesPartitioning=False)
# Return a new RDD by applying a function
# to each partition of this RDD.
top10 = rdd.mapPartitions(top10_design_pattern)

To complete the implementation, we present the top10_design_pattern() function, which finds top-10 for each partition (each partition is a set of (key, value) pairs):

from sortedcontainers import SortedDict
#
def top10_design_pattern(partition_iterator): 1
    sd = SortedDict()   2
    for url, frequency in partition_iterator:  3
        sd[frequency] = url  4
        if (len(sd) > 10):  4
            sd.popitem(0)   5
    #end-for
    print("local sd=", sd)
    #
    pairs = [(k, v) for k, v in sd.items()]  6
    print("top 10 pairs=", pairs)
    return  pairs  7
#end-def
1

The partition_iterator is an iterator for a single partition; an iterator over a set of (URL, frequency)

2

Create an empty SortedDict of (Integer, String)

3

Iterate over a set of (URL, frequency)

4

Put (frequency, URL) into SortedDict

5

Keep/limit the size of sorted dictionary to 10 (remove the lowest frequency)

6

Convert SortedDict (which is a local top-10) into a list of (k, v)

7

Return a local top-10 list for a single partition

This is how it works: input is divided into a number of partitions. The number of partitions can be 8, 20, 100, 4000, … — depends on the size of cluster and number of available memory, disks, CPU, and other resources. Note that a programmer can set the number of partitions explicitly as well. Each partition is a set of (URL, frequency) pair elements. Each mapper accepts a partition of elements where each element is a pair of (URL, frequency). After mapper finishes creating a top-10 list as SortedDict[Integer, String], then the function returns the local top-10 list. Note that we use a single dictionary (such as a SortedDict) per partition (and not per element of source RDD).

Implementation in PySpark

The PySpark implementation is presented below. Note that this solution assumes that all URLs (i.e., keys) are unique. If The URLs are not unique, then you may use the reduceByKey() transformation to make all keys unique.

# cat /tmp/urls.txt
url-0001,12
url-0002,22
url-0003,1000
url-0004,199
...

How to Find Bottom-10

In previous sections, se showed that how to find the “top-10” list. To find the “bottom-10”, we just need to change one line of code:

  Replace the following

        # find top-10
        if (len(sd) > 10):  1
            sd.popitem(0)   2

  With

        # find bottom-10
        if (len(sd) > 10):   3
            sd.popitem(-1)   4
1

if size of SortedDict is larger than 10

2

then remove the lowest frequency URL from the dictionary

3

if size of SortedDict is larger than 10

4

then remove the highest frequency URL from the dictionary

Next let’s discuss how to partition your input and RDDs. Partitioning RDD is a combination of art and science. What is the right number of partitions for your cluster? There is no magic silver bullet formula for calculating the proper number of partitions. This does depend on the number of cluster nodes, the number of cores per server, and the size of RAM available. My experience indicates that you need to set this by trial and experience. The general rule of thumb is to use the following per executor.

  2 * num_executors * cores_per_executor

Also, when you want to create your first RDD, you can set the number of partitions as:

input_path = "/data/my_input_path"
desired_num_of_partitions = 16
rdd = spark.sparkContext.textFile(input_path, desired_num_of_partitions)

For example the preceding example creates an RDD[String] with 16 partitions. If you do not set the number of partitions explicitly, then the Spark cluster manager will set it to a default number (based on the resources available from the cluster).

If your RDD is already created with some partitions, then you may reset the new number of partitions by using the coalesce() function: let rdd be an RDD[T]:

# rdd : RDD[T]
desired_number_of_partitions = 40
rdd2 = rdd.coalesce(desired_number_of_partitions)

The newly created rdd2 (as RDD[T]) will have 40 partitions. According to Spark documentation, the coalesce() function is defined as:

pyspark.RDD.coalesce:
coalesce(numPartitions, shuffle=False)

Description:
  Return a new RDD that is reduced into numPartitions partitions.

Next, I introduce the MinMax design pattern, which the goal is to find (minimum, maximum) of large data set of numbers.

MinMax

Given a set of billions of numbers, the goal is to find the minimum, maximum, and count of all of the given numbers. MinMax is a “Numerical Summarizations” design pattern. This pattern can be used in the scenarios where the data you are dealing with or you want to aggregate is of numerical type and the data can be grouped by specific fields. To understand the concept of MinMax design pattern, I am going to present three different solutions with quite different performances.

Solution-1: Classic MapReduce

The naive approach will be to emit the following (key, value) pairs:

("min", number)
("max", number)
("count", 1)

for all of the numbers in the given input data set, then sort and shuffle will group all values by three keys: “min”, “max”, and “count”, and finally, we may use a reducer to iterate all numbers and find “min”, “max”, and “count” for all numbers. The problem with this approach is that we have to move billions of (key, value) pairs in the network and then create three huge Iterable<Long>(assuming the data type of numbers is Long data type). This is a possible solution, which might not scale out and might have serious performance problems. In the reduction phase, this solution will not utilize the whole cluster due to having only three unique keys: “min”, “max”, and “count”.

Solution-2: Sorting

The next solution will sort all numbers and then you get the top (for “max”) and bottom (for “min”) and “count” of the data set. If the performance is acceptable, then it is a good solution. But if you have a lot of numbers, then sorting time might not be acceptable.

Solution-3: Spark’s mapPartitions()

The final solution (the most efficient from performance and scalability point of view) splits data into N chunks (partitions), and then uses Spark’s mapPartitions() transformation to emit three (key, value) pairs from a single partition:

   ("min", the-minimum-number-in-a-partition)
   ("max", the-maximum-number-in-a-partition)
   ("count", count-of-numbers-in-a-partition)

Finally, we find the min, max, and count from all partitions. This solution scales out very well. No matter how many partitions we have, this will work and will not create OOM error. For example, if you have 500 billion numbers (assume one or more numbers per record), then partition it by 100,000 (in worst case, each partition will have 5 million records — one number per record), which means that each partition will get about 5 million numbers. Then each partition will emit three pairs as illustrated above. Finally, you find min, max, and count of 100,000 x 3 pairs = 300,000 numbers. Finding min, max, and count for 300,000 numbers is very trivial and will not cause any scalability problem at all.

The high-level solution is illustrated by the Figure 19.5.

min max design pattern
Figure 2-6. High Level Solution for MinMax Design Pattern

Solution-3 Input

We assume that our input records have the following format:

<number><,><number><,><number>...

Sample records are presented below:

10,345,24567,2,100,345,9000,765
2,34567,23,13,45678,900
...

PySpark Solution

Here, we present the PySpark solution for solving the MinMax problem. For details see the minmax_use_mappartitions.py program.

input_path = <your-input-path>
rdd = spark.sparkContext.textFile(input_path)  1
min_max_count = rdd.mapPartitions(minmax)  2
min_max_count_list = min_max_count.collect() 3
final_min_max_count = find_min_max_count(min_max_count_list) 4
1

Return a new RDD from the given input

2

Return an RDD of (min, max, count) from each partition by applying the minmax function

3

Collect (min, max, count) from all partitions as a list

4

Find the (final_min, final_max, final_count) by calling the find_min_max_count()

Next, minmax() function is presented:

def minmax(iterator):  1
#
    print("type(iterator)=", type(iterator))  2
#   ('type(iterator)=', <type 'itertools.chain'>)
#
    first_time = False 3
#
    for record in iterator:   4
        numbers = [int(n) for n in record.split(",")]  5
        if (first_time == False):   6
            # initialize count, min, max to the 1st record values
            local_min = min(numbers)
            local_max = max(numbers)
            local_count = len(numbers)
            first_time = True
        else:   7
            # update count, min, and max
            local_count += len(numbers)
            local_max = max(max(numbers), local_max)
            local_min = min(min(numbers), local_min)
    #end-for
    return [(local_min, local_max, local_count)]  8
#end-def
1

The iterator is a type of itertools.chain

2

Print the type of iterator (for debugging only)

3

Define some variables for return

4

Iterate the iterator (record holds a single record)

5

Tokenize input and build an array of numbers

6

If its the first record, then find out min, max, and count

7

If not the first record, then update local_min, local_max, and local_count

8

Finally return a triplet from each partition

What if some of the partitions are empty? Is it possible for a partition to be empty? Yes, there are many reasons for that (according to Spark documentation). What is an empty partition? An empty partition is a partition with no data at all. Therefore, we have to handle empty partitions gracefully (in case there are empty partitions).

Here I will show how to handle empty partitions. Error handling in Python is done through the use of exceptions that are caught in try blocks and handled in except blocks. Try and Except in Python is defined as: If an error is encountered, a try block code execution is stopped and transferred down to the except block.

def minmax(iterator):  1
#
    print("type(iterator)=", type(iterator))  2
#   ('type(iterator)=', <type 'itertools.chain'>)
#
    try:
        first_record = next(iterator)   3
    except StopIteration:  4
        return [(1, -1, 0)] # WHERE min > max to filter out later
#
    # initialize count, min, max to the 1st record values
    numbers = [int(n) for n in first_record.split(",")]  5
    local_min = min(numbers)
    local_max = max(numbers)
    local_count = len(numbers)
#
    for record in iterator:   6
        numbers = [int(n) for n in record.split(",")]
        # update min, max, count
        local_count += len(numbers)
        local_max = max(max(numbers), local_max)
        local_min = min(min(numbers), local_min)
#   end-for
    return [(local_min, local_max, local_count)]  7
1

The iterator is a type of itertools.chain

2

Print the type of iterator (for debugging only)

3

Try to get the first record from a given iterator, if successful, then the first_record is initialized to the first record of a partition

4

If you are here, then it means that the partition is empty, return a fake triplet

5

Set min, max, and count from the first record

6

Iterate the iterator for 2nd, 3rd, … records (record holds a single record)

7

Finally return a triplet from each partition

How should we test handling empty partitions? The program minmax_force_empty_partitions.py (source code of Chapter 12 in GitHub) forces to create empty partitions and hadles empty partitions gracefully. How do you force to create empty partitions? Set the “number of partitions” high enough such as greater than the number of input records. For example, if your input has N records, then set the number of partitions to N+3, which the partitioner might create up to 3 empty partitions.

The Composite Pattern and Monoids

This section explores the concept of the “composite pattern” and Monoids and will show how to use “composite pattern” and Monoids in the context of Spark and PySpark.

Composite Pattern

What is the “composite pattern”? In a nutshell, the composite pattern is a structural design pattern (also called a partitioning design pattern) that can be used when a group of objects can be treated the same as a single object in that group. You may use the composite pattern to create hierarchies of objects and groups of objects, quickly turning the structure into a tree with leaves (objects) and composites (sub-groups). The composite pattern in UML notation is illustrated in Figure 19.6.

Composite UML class diagram
Figure 2-7. Composite Pattern Diagram

According to Wikipedia: “in software engineering, the composite pattern is a partitioning design pattern. The composite pattern describes a group of objects that is treated the same way as a single instance of the same type of object. The intent of a composite is to “compose” objects into tree structures to represent part-whole hierarchies. Implementing the composite pattern lets clients treat individual objects and compositions uniformly.”

For example, using a tree data structure, composite pattern is illustrated as Figure 19.7. Another example of composite pattern will be adding a set of numbers, which is illustrated as Figure 19.7. Here the numbers are the leaves and composites are the addition operator.

composite pattern addition
Figure 2-8. Composite Pattern Example: Addition

Next, I discuss the concept of monoids in the context of composite pattern.

Monoids

We have already seen the use of monoids in using reduction transformations (chapter 4). Here we will look at the monoids in the context of composite pattern. From the composite pattern definition, it should be obvious to see that there is a commonality between Monoids and composite patterns. The composite pattern is a design pattern, which is commonly used in programming languages such as Java, Python, and Scala and also used in big data for composing (such as addition and concatenation operators) and aggregating a set of data points.

To observe the commonality between Monoids and composite patterns, let’s take a look at the definition of “Monoids” (source: Wikipedia):

    In abstract algebra, a  branch of  mathematics, a monoid
    is  an  algebraic structure with  a  single  associative
    binary operation and an  identity  element. Monoids  are
    studied in semigroup theory, because they are semigroups
    with  identity.  Monoids occur  in several  branches  of
    mathematics;  for instance,  they  can  be  regarded  as
    categories  with a  single  object. Thus,  they  capture
    the idea of  function composition within a set. In fact,
    all  functions  from a set  into itself  form  naturally
    a monoid with respect  to function  composition. Monoids
    are also  commonly used in computer science, both in its
    foundational  aspects  and  in   practical  programming.

An application of monoids in computer science is so-called the “MapReduce” programming model. The MapReduce programming model, consists of three functions: map(), combine(), and reduce(). These three functions are very similar to the map() and flatMap() functions (combine() is an optional operation) and reduce() transformations in Spark. Given a dataset, map() consists of mapping arbitrary data to elements of a specific monoid, combine() aggregates/folds data in local level (worker nodes in cluster), and finally the reduce() consists of aggregating/folding those elements, so that in the end we produce just one element.

So in terms of a programming language semantics, a monoid is just an interface with one abstract value and one abstract method. The abstract method for a Monoid is the append operation (it can be an addition operator on integers and can be a concatenation operator on string objects). The “abstract value” for a Monoid is the identity value, for example, for adding a set of integer numbers, identity value is zero and for concatenating strings, identity value is an empty string (string of length zero). Note that, the identity value is defined as the value you can append to any value that will always result in the original value unmodified. For example, the identity value for collections data structures is the empty collection because appending a collection to an empty collection will typically produce the same collection unmodified.

In this paper: Monoidify! Monoids as a Design Principle for Efficient MapReduce Algorithms, Jimmy Lin clearly defines and relates monoids as a design principle for efficient MapReduce algorithms. But what is a monoid, what properties defines it, and how does it aid the MapReduce paradigm. Following abstract algebra, “a monoid is an algebraic structure with a single associative binary operation and an identity element”. Jimmy Lin shows that when your MapReduce operations (map(), reduce(), and reduceByKey() transformations in Spark) are not monoids, then it is very hard (and may be impossible) to use “combiners” efficiently. Next we will briefly review MapReduce’s combiners and abstract algebra’s monoids and see how they are related to each other.

Also, David Saile states that “recall that a monoid is an algebraic structure with a single associative binary operation and an identity element. For example, the natural numbers N (the term “natural numbers” refers either to the set of positive integers {1, 2, 3, ...} or to the set of non-negative integers {0, 1, 2, 3, ...}) form a monoid under addition with identity element zero. In classic MapReduce, the mapper is not constrained, but the reducer is required to be (the iterated application of) an associative operation. Recent research argued that reduction is in fact monoidal in known applications of MapReduce. That is, reduction is indeed the iterated application of an associative operation f() with a unit u. In the case of the word-occurrence count example, reduction iterates addition + with 0 as unit. The parallel execution schedule may be more flexible if commutativity is required in addition to associativity.”

In MapReduce framework, the combiner (as an optional plug-in component) is a “local-reduce” process which operates only on data generated by one server. Successful use of combiners reduces the amount of intermediate data generated by the mappers on a given single server (that is why it is so-called a local reducer). Combiners can be used as a MapReduce optimization to reduce network traffic (by decreasing size of the transient data as (key, value) pairs) between mappers and reducers. Typically, combiners have the same interface as reducers. The combiner must have the following characteristics:

  • Combiners receives as input all the data emitted by the mapper instances on a given server (this is called a local aggregation)

  • Combiners output is sent to the reducers — some programmers call this as a local server reduction!

  • The combiner must be side-effect free; combiners may run an indeterminate number of times.

  • The combiner must have the same input and output key types (see example below)

  • The combiner must have the same input and output value types (see example below)

  • The combiner runs in memory after the map phase

Therefore, a combiner skeleton should be defined as:

# key: as KeyType
# values: as Iterable<ValueType>
def combine(key,  values):
   ...
   # use key and values to create new_key and new_value
   new_key = <a-value-of-KeyType>
   new_value = <a-value-of-ValueType>
   ...
   return (new_key, new_value);
   ...
#end-def

This template indicates that (key, value) pairs generated by a combiner has to match the (key, value) pairs received (as an input) by reducers. For example, if mapper outputs (T~1~, T~2~) pairs (key is type T~1~ and value is type T~2~) then a combiner has to emit (T~1~, T~2~) pairs as well.

The MapReduce/Hadoop does not have an explicit combine() function, we just use reduce() in Hadoop to implement combiners, but we use a plugin of Job.setCombinerClass() to define a combiner class.

Furthermore, Jimmy Lin concludes that “one principle for designing efficient MapReduce algorithms can be precisely articulated as follows: create a monoid out of the intermediate value emitted by the mapper. Once we “monoidify” the object, proper use of combiners and the in-mapper combining techniques becomes straightforward.” (source: Monoidify! Monoids as a Design Principle for Efficient MapReduce Algorithms)

Some programming languages have direct support for monoids. For example, the Haskell programming language has a direct support for monoids. In Haskell, “a monoid is a type with a rule for how two elements of that type can be combined to make another element of the same type.”

Definition of Monoid

Monoid is a triplet (S, f, e), where

  • S is a set

    • S is called underlying set of monoid

  • f : S x S → S

    • f is a mapping called binary operation of monoid

  • e ∈ S

    • e is the identity operation of monoid

A monoid with binary operation + (note that, here, the binary operation is denoted by + and it does not mean a mathematical addition operator) satisfies the following three axioms (note that f(a,b) = a + b):

  • Closure: for all a, b in S, the result of the operation (a + b) is also in S.

  • Associativity: for all a, b and c in S, the following equation holds:

((a + b) + c) = (a + (b + c))
  • Identity element: there exists an element e in S, such that for all elements a in S, the following two equations hold:

e + a = a
a + e = a

And in mathematical notation we can write these as

  • Closure:

∀ a,b ∈ S: a + b ∈ S

  • Associativity:

for all a,b,c in S: ((a + b) + c) = (a + (b + c))
  • Identity element:

{
  exists e in S:
    for all a in S:
       e + a = a
       a + e = a
}

A monoid might have other properties: The monoid operator might (but isn’t required to) obey other properties like:

  • idempotency:

{ for all a in S: a + a = a }
  • commutativity:

{ for all a, b in S: a + b = b + a }

How to form a Monoid?

To form a monoid first we need a type S, which can define a set of values such as integers: {0, -1, +1, -2, +2, ...}. The second component is a binary function:

+ : S x S → S

Then we need to make sure that for any two values x in S and y in S we get a result object, the combination of x and y:

x + y : S

For example, let type S be a set of integers, then the binary operation may be addition (+), multiplication (*) or division (/). Finally, as the third and most important ingredient we need binary operation to follow a set of laws. If it does, then S together with binary operation (+) is called a monoid. We say: (S, +, e) is a monoid, where e in S is the identity element (such as 0 for addition and 1 for multiplication). Also note that the binary division operator (‘/’) over a set of real numbers is not a monoid:

    ((12 / 4) / 2) not equal (12 / (4 / 2))
    ((12 / 4) / 2) = (3 / 2) = 1.5
    (12 / (4 / 2)) = (12 / 2) = 6.0

In a nutshell, monoids capture the notion of combining arbitrarily many things into a single thing together with a notion of an empty thing called the identity. One simple example is addition on natural numbers {1, 2, 3, ...}. The addition function + allows us to combine arbitrarily many natural numbers into a single natural number, the sum. The identity is the number zero. Another example is string concatenation. The concatenation operator allows us to combine arbitrarily many strings into a single string. The identity value is the empty concatenation, the empty string.

Monoidic and Non-Monoidic Examples

To understand the concept of monoids, below, I have listed some monoid and non-monoid examples. For example, to use Spark’s reduceByKey() effectively, you must make sure that the reduction function is a monoid, since Spark uses combiners on the reduceByKey() transformation.

Subtraction over Set of Integers

The set S = {0, 1, 2, ...} is a commutative monoid for the MAX (maximum) operation, whose identity element is 0 (zero).

MAX(a, MAX(b, c)) = MAX(MAX(a, b), c)}
MAX(a, 0) = a
MAX(0, a) = a
MAX(a, b) in S

Subtraction over Set of Integers

Subtraction operator (-) over a set of integers does not define a monoid; this operation is not associative:

(1 - 2) -3 = -4
1 - (2 - 3) = 2

Addition over Set of Integers

Addition operator (+) over a set of integers defines a monoid; this operation is commutative and associative and the identity element is 0:

(1 + 2) + 3 = 6
1 + (2 + 3) = 6
n + 0 = n
0 + n = n

we can formalize this monoid as (below e(+) defines an identity element):

S = {0, -1, +1, -2, +2, -3, +3, ...}
e(+) = 0  (note: identity element is 0)
f(a, b) = f(b, a) = a + b

Multiplication over Set of Integers

The natural numbers, N = {0, 1, 2, 3, ...}, form a commutative monoid under multiplication (identity element one).

Mean over Set of Integers

On the other hand, the natural numbers, N = {0, 1, 2, 3, ...}, does not form a monoid under the mean (average) function. The following example shows that the mean of means of arbitrary subsets of a set of values is not the same as the mean of the set of values:

MEAN(1, 2, 3, 4, 5)
-- NOT EQUAL --
MEAN( MEAN(1,2,3), MEAN(4,5) )

MEAN(1, 2, 3, 4, 5) = (1+2+3+4+5)/5
                    = 15/5
                    = 3

MEAN( MEAN(1,2,3), MEAN(4,5) ) = MEAN(2, 4.5)
                               = (2 + 4.5)/2
                               = 3.25

Therefore, if you want to find average of values for a pair RDD (as RDD[(key, integer)]), then you may not use the following transformation (which might yeild a wrong value due to aprtitioning):

# let rdd be a RDD[(key, integer)]
average_per_key = rdd.reduceByKey(lambda x, y: (x+y)/2)

The correct way to find average per key is to make it a monoid:

# let rdd be a RDD[(key, integer)]
# create value as (sum, count) pair: this makes a monoid
rdd2 = rdd.mapValues(lambda n: (n, 1))
# find (sum, count) per key
sum_count = rdd2.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
# now, given (sum, count) per key, find the average per key
average_per_key = sum_count.mapValues(lambda x: x[0]/x[1])

Non-Commutative Example

For a non-commutative example, consider the collection of all binary strings: an element is a finite ordered sequence of 0’s and 1’s. The binary operation is just concatenation: e.g. concat(1011, 001001) = 1011001001. The identity element is the “empty string” (String of size 0). Therefore concatenation of binary strings is a monoid.

Median over Set of Integers

The natural numbers does not form a monoid under the MEDIAN function:

MEDIAN(1, 2, 3, 5, 6, 7, 8, 9)
-- NOT EQUAL --
MEDIAN( MEDIAN(1,2,3), MEDIAN(5,6,7,8,9) )

MEDIAN(1, 2, 3, 5, 6, 7, 8, 9)
  = (5 + 6) / 2
  = 11 / 2
  = 5.5

MEDIAN( MEDIAN(1,2,3), MEDIAN(5,6,7,8,9) )
  = MEDIAN(2, 7) =
  = (2 + 7) / 2
  = 9 / 2
  = 4.5

Concatenation over Lists

A good example of a monoid is list of objects. Lists with concatenation (+) and the empty list (as []) are a monoid: for any list, we can write:

Let L be a list
L + [] = L
[] + L = L

Also, note that the concatenation function is associative. Given two lists, say [1,2,3] and [7,8], you can join them together using + to get [1,2,3,7,8]. There’s also the empty list []. Using + to combine [] with any list gives you back the same list, for example []+[1,2,3] = [1,2,3] and [1,2,3]+[] = [1,2,3].

Union and Intersection over Integers

Sets under union or intersection over a set of integers forms a monoid.

Matrix Example

Let N = {1, 2, 3, ...}. Let m, n ∈ N. Then the set of m x n matrices with integer entries, written as Zmxn satisfies properties that make it a monoid under addition:

  • closure is guaranteed by the definition

  • the associative property is guaranteed by the associative property of its elements; and

  • the additive identity is 0, the zero matrix

What have we learned from these examples and monoids concepts? Spark’s reduceByKey() is an efficient transformation, which merges the values for each key using an associative and commutative reduce function. Therefore, we have to make sure that reduceByKey()'s reduce finction is a monoid, otherwise you might not get correct reduction results.

Not a Monoid Example

Given a large number of (key, value) pairs where the keys are strings and the values are integers, the goal is to find the average of all the values by key. In SQL, this is accomplished as (assuming that our table — named as mytable — has key and value columns):

  • Select All Data

SELECT key, value FROM mytable

key    value
---    -----
key1   10
key1   20
key1   30
key2   40
key2   60
key3   20
key3   30
  • Select All Data and GROUP BY key

SELECT key, AVG(value) as avg FROM mytable GROUP BY key

key    avg
---    ---
key1   20
key2   50
key3   25

Here is the first version of MapReduce algorithm, where the mapper is not generating monoid outputs for the mean/average function.

  • Mapper function:

# key a string object
# value an long associated with key
map(key, value) {
   emit(key, value);
}
  • Reducer function:

#key a string object
# values as list of long data type numbers
reduce(key, values) {
   sum = 0
   count = 0
   for (i : list) {
      sum = sum + i
      count++;
   }
   average = sum / count
   emit(key, average)
}

Two observations from the first version of MapReduce algorithm:

  • the algorithm is not very efficient: since there will be too much work by “sort and shuffle” functions of MapReduce framework

  • We can not use the reducer as a combiner: since we know that the mean of means of arbitrary subsets of a set of values is not the same as the mean of the set of values.

Note that using combiners make Spark, MapReduce, and distributed algorithms efficient by reducing network traffic (you need to ensure that the combiner provides sufficient aggregation) and reducing the load of “sort and shuffle” functions of MapReduce framework. Now the question is how can we use our reducer to work as a combiner? The answer is to make output of the mapper to be a monoid: we change the output of a mapper. Once your mapper outputs monoids, then combiners and reducers will behave correctly.

Next, let’s look a monoid example.

Monoid Example

In this section, I am revising the mapper to generate (key, value) pairs where key is the string and value is a pair (sum, count), which has a monoid property. The (sum, count) data structure is a monoid and the identity element is (0, 0). The proof is given below:

Monoid type is (N, N) where N = {set of integers}

Identity element is (0, 0):
(sum, count) + (0, 0) =  (sum, count)
(0, 0) + (sum, count) =  (sum, count)

Let a = (sum1, count1), b = (sum2, count2), c = (sum3, count3)
Then associativity holds:
(a + (b + c)) = ((a + b) + c)

+ is the binary function:
a + b = (sum1+sum2, count1+count2) in (N, N)

Now, let’s write a mapper (for MapReduce paradigm) for a Monoid data type

# key a string object
# value a long data type associated with key
# emits (key, (sum, count))
map(key, value) {
   emit (key, (value, 1))
}

As you can see, the key is the same as before, but the value is a pair of (sum, count). Now, the output of mapper is a monoid where the identity element is (0, 0). The element-wise sum operation can be performed as:

 element1 = (key, (sum1, count1))
 element2 = (key, (sum2, count2))

 ==> values for the same key are reduced as:

  element1 + element2
    = (sum1, count1) + (sum2, count2)
    = (sum1+sum2, count1+count2)

Now the mean function will be calculated correctly since mappers output monoids: imagine values for a single key are {1, 2, 3, 4, 5} and {1, 2, 3} goes to partition-1 and {4, 5} goes to partitin-2:

MEAN(1, 2, 3, 4, 5)
  = MEAN( MEAN(1,2,3), MEAN(4,5) )}
  = (1+2+3+4+5) / 5
  = 15 / 5
  = 3

Partition-1:
  MEAN(1,2,3) = MEAN(6, 3)

Partition-2:
  MEAN(4,5) = MEAN(9, 2)

Merging partitions:
MEAN( MEAN(1,2,3), MEAN(4,5) )
  = MEAN( MEAN(6, 3), MEAN(9, 2))
  = MEAN(15, 5)
  = 15 / 5
  = 3

The revised algorithm, where mappers (defined above) outputs are monoids is presented below (for a given pair (sum, count), pair.1 refers to sum and pair.2 refers to count):

# key a string object
# values is a list of pairs as: [(s1, c1), (s2, c2), ...]
combine(key, values) {
   sum = 0
   count = 0
   for (pair : values) {
      sum += pair.1
      count += pair.2
   }
   emit (key, (sum, count))
}

The reduce() transformation is presented below:

# key a string object
# value is a list of pair as [(s1, c1), (s2, c2), ...]
reduce(key, values) {
   sum = 0
   count = 0
   for (pair : values) {
      sum += pair.1
      count += pair.2
   }
   average = sum / count
   emit (key, average)
}

Therefore, in order to to qurantee that your combiners are executing correctly, you must make sure that your mappers are generating monoidic data types.

PySpark Implementation of Monodized Mean

The goal of this section is to provide Monodized Mean solution: it means that combiners can be used to assist in aggregating values. To compute mean of numbers for the same key, we can group all values (using the Spark’s groupByKey() transformation) per key and then find the sum and divide by the count of numbers. This is not an optimal solution due to the fact the groupByKey() might create OOM problems.

For Monodized Mean solution, for a given (key, number) we will emit (key, (number, 1)). The associated value for a key denotes a pair of (sum, frequency). Earlier, I demonstrated that using (sum, frequency) for values enable us to use combiners and reducers for proper calculation of the mean function. With this set up, we can use reduceByKey() transformation, which is very efficient reducer. How do we reduce? Given the following two pairs for the same key (in earlier section, I proved that (sum, count) is a monoid data type for finding average of values — therefore using reduceByKey() will work correctly):

(key, value1) = (key, (sum1, count1))
(key, value2) = (key, (sum2, count2))

This is how reduction function will work:

value1 + value2 =
(sum1, count1) + (sum2, count2) =
(sum1+sum2, count1+count2)

Once the reduction (by reduceByKey()) is done, then we need an additional mapper to find the average by dividing the sum by the count (which will be shown in the following subsections).

Input

The input record format will be

Input Record Format:
  <key-as-string><,><value-as-integer>

Examples:
  key1,100
  key2,46
  key1,300

PySpark Solution

The high-level steps are presented by the following four steps:

  • Step-1: Read input and create the first RDD as RDD[String]

  • Step-2: Apply map() to create RDD[key, (number, 1)]

  • Step-3: Perform reduction by reduceByKey(), which will create RDD[key, (sum, count)]

  • Step-4: Apply mapValue() to create average RDD as RDD[key, (sum / count)]

The complete PySpark program (average_monoid_driver.py) is listed below.

First, we need two simple Python functions to help us in using Spark transformations.

First function: create_pair to accept a String object as “key,number” and returns (key, (number, 1)) pair.

# record as String of "key,number"
def create_pair(record):
  tokens = record.split(",")
  key = tokens[0]
  number = int(tokens[1])
  return (key, (number, 1))
# end-def

Second function: add_pairs accept two tuples of (sum1, count1) and (sum2, count2) and returns sum of tuples (sum1+sum2, count1+count2).

# a = (sum1, count1)
# b = (sum2, count2)
def add_pairs(a, b):
    # sum = sum1+sum2
    sum = a[0] + b[0]
    # count = count1+count2
    count = a[1] + b[1]
    return (sum, count)
# end-def

Here is the complete PySpark solution:

from __future__ import print_function  1
import sys  2
from pyspark.sql import SparkSession  3

if len(sys.argv) != 2:   4
    print("Usage: average_monoid_driver.py <file>", file=sys.stderr)
    exit(-1)

spark = SparkSession.builder.getOrCreate()  5


#  sys.argv[0] is the name of the script.
#  sys.argv[1] is the first parameter
input_path = sys.argv[1]   6
print("input_path: {}".format(input_path))

# read input and create an RDD<String>
records = spark.sparkContext.textFile(input_path)  7

# create a pair of (key, (number, 1)) for "key,number"
key_number_one = records.map(create_pair)  8

# aggregate the (sum, count) of each unique key
sum_count = key_number_one.reduceByKey(add_pairs)  9

# create the final RDD as RDD[key, average]
averages =  sum_count.mapValues(lambda (sum, count): sum / count)  10
print("averages.take(5): ", averages.take(5))

# done!
spark.stop()
1

Import the print() function

2

Import System-specific parameters and functions

3

Import SparkSession from the pyspark.sql module

4

Make sure that we have 2 parameters in the command line

5

Create an instance of a SparkSession object by using the builder pattern SparkSession.builder class

6

Define input path (this can be a file or a directory containing any number of files

7

Read input and create the first RDD as RDD[String] where each object has this format: “key,number”

8

Create key_number_one RDD as (key, (number, 1))

9

Aggregate (sum1, count1) with (sum2, count2) and create (sum1+sum2, count1+count2) as values

10

Apply the mapValues() transformation to find final average per key

Conclusion on Using Monoids

We observed that in MapReduce paradigm (which is a foundation of Hadoop, Spark, Tez, …), if your mapper generates monoids then you can utilize combiners for optimization and efficiency purposes (using combiners reduces network traffic and make MapReduce’s “sort and shuffle” functions more efficient by processing less data). Also, we showed that how to monodify MapReduce algorithms. The challenge to us is to monodify our MapReduce algorithms. In general, combiners can be used when the function you want to apply is both commutative and associative (properties of a monoid). For example, classic word count is a monoid over a set of integers with the + operation (here you can use a combiner). But the mean function (which is not associative as shown in the counter example above) over a set of integers does not form a monoid. Therefore, if combiners used properly, then it will significantly cuts down the amount of data shuffled from the maps to the reducers.

Functors and Monoids

Now that we have seen the definition and use of monoids in MapReduce framework, we can even apply higher-order functions (like “functors”) to monoids. A functor is an object that is a function (or we can say that it is a function and object at the same time). The Java JDK8 has a direct support for functors and it is named lambdas (for details see Lambda project). Programming languages JDK6 and JDK7 do not have direct concept of functors, because functions are not first-class objects in Java; it means that you can not pass a function name as an argument to another function. There is a simple way to simulate functors in Java by defining an interface and a method (this is a very simplistic simulation):

public interface FunctorSimulation<T1, T2> {
    T2 apply(T1 input);
}

First, we present a “functor” on a monoid by a simple example: Let MONOID = (t, e, f) be a monoid, where T is a type (set of values), e is the identity element, and f is the “+” binary plus function:

MONOID = {
   type T
   val e : T
   val plus : T x T -> T
}

Then we define a functor Prod as

functor Prod (M : MONOID) (N : MONOID) = {
   type t = M.T * N.T
   val e = (M.e, N.e)
   fun plus((x1,y1), (x2,y2)) = (M.plus(x1,x2), N.plus(y1,y2))
}

Then we can define other functors such as Square as:

functor Square (M : MONOID) : MONOID = Prod M M

Also, a functor between two monoids can be defined as: Let (M1, f1, e1) and (M2, f2, e2) be monids. A functor:

F : (M1, f1, e1) → (M2, f2, e2)

is specified by a an object map (monoids are categories with a single object) and an arrow map: F : M1 → M2 and the following conditions will hold:

∀a,b ∈ M1, F(f1(a,b)) = f2(F(a), F(b))

F(e1) = e2

A functor between two monoids is just a “monoid homomorphism.” For example, for String data type, function Length() that counts the number of letters in a word is a monoid homomorphism.

  • Length("") = 0 (length of an empty string is 0)

  • If Length(x) = m and Length(y) = n, then concatenation x + y of strings has m + n letters, for example:

Length("String" + "ology")
    = Length("Stringology")
    = 11
    = 6 + 5
    = Length("String") + Length("ology")

Therefore creating monoids by mappers is a guarantee that the recucers can take advantage of using combiners effectivey and correctly.

Next, we take a look a map-side join design pattern, which can reduce the join performance time between two large data sets.

Map-Side Join

A join operation is used to combine rows from two (or more) tables, based on a common column (or columns) between them. Join is an expensive operation and since big data engines (such as Hadoop and Spark) do not support indexing of data tables, therefore, join can be an expensive. Here, I introduce a map-side join, which can reduce the cost of join between two tables. Map-side join is a process where two data sets are joined by the mapper rather than using the actual join function (which can happen by combination of a mapper and reducer). What are the advantages of using map-side join? The advantages of using map side join are as follows:

  • Map-side join might help in minimizing the cost that is incurred for sorting and merging in the shuffle and reduce stages.

  • Map-side join might help in improving the performance of the task by decreasing the time to finish the task.

Using SQL terminology, a JOIN clause is used to combine rows from two (or more) tables, based on a related column between Typically, a SQL JOIN is an expensive operation (unless you have built proper indexes). In general, joins on large sets are expensive, but very rarely do you want to join the entire contents of large table A with the entire contents of large table B.

To understand map side join, assume that we have two tables (using MySQL database): EMP and DEPT as defined below. Let’s see how a join works between these two tables. Consider the following two tables EMP and DEP:

mysql> use testdb;
Database changed

mysql> select * from emp;
+--------+----------+---------+
| emp_id | emp_name | dept_id |
+--------+----------+---------+
|   1000 | alex     | 10      |
|   2000 | ted      | 10      |
|   3000 | mat      | 20      |
|   4000 | max      | 20      |
|   5000 | joe      | 10      |
+--------+----------+---------+
5 rows in set (0.00 sec)

mysql> select * from dept;
+---------+------------+---------------+
| dept_id | dept_name  | dept_location |
+---------+------------+---------------+
|      10 | ACCOUNTING | NEW YORK, NY  |
|      20 | RESEARCH   | DALLAS, TX    |
|      30 | SALES      | CHICAGO, IL   |
|      40 | OPERATIONS | BOSTON, MA    |
|      50 | MARKETING  | Sunnyvale, CA |
|      60 | SOFTWARE   | Stanford, CA  |
+---------+------------+---------------+
6 rows in set (0.00 sec)

Next we join (as INNER JOIN) these tow tables on the dept_id key:

mysql> select e.emp_id, e.emp_name, e.dept_id, d.dept_name, d.dept_location
         from emp e, dept d
             where e.dept_id = d.dept_id;
+--------+----------+---------+------------+---------------+
| emp_id | emp_name | dept_id | dept_name  | dept_location |
+--------+----------+---------+------------+---------------+
|   1000 | alex     | 10      | ACCOUNTING | NEW YORK, NY  |
|   2000 | ted      | 10      | ACCOUNTING | NEW YORK, NY  |
|   5000 | joe      | 10      | ACCOUNTING | NEW YORK, NY  |
|   3000 | mat      | 20      | RESEARCH   | DALLAS, TX    |
|   4000 | max      | 20      | RESEARCH   | DALLAS, TX    |
+--------+----------+---------+------------+---------------+
5 rows in set (0.00 sec)

Map-side Join is similar to a join but all the task will be performed by the mapper alone (note that the result of an INNER JOIN and map side join must be identical).

Given two tables A and B, the map-side join will be mostly suitable when table A is large and table B is a small to meduim. Since table B is not very big, then we create a hash table from B and then broadcast it to all nodes. Next, we iterate all elements of table A by a mapper and then accessing a broadcasted hash table (which denotes table B).

Create two RDDs:

First, we create EMP as RDD[(dept_id, (emp_id, emp_name))]:

EMP = spark.sparkContext.parallelize(
[
  (10, (1000, 'alex')),
  (10, (2000, 'ted')),
  (20, (3000, 'mat')),
  (20, (4000, 'max')),
  (10, (5000, 'joe'))
])

Next, we create DEPT as RDD[(dept_id, (dept_name , dept_location))]:

DEPT= spark.sparkContext.parallelize(
[ (10, ('ACCOUNTING', 'NEW YORK, NY')),
  (20, ('RESEARCH', 'DALLAS, TX')),
  (30, ('SALES', 'CHICAGO, IL')),
  (40, ('OPERATIONS', 'BOSTON, MA')),
  (50, ('MARKETING', 'Sunnyvale, CA')),
  (60, ('SOFTWARE', 'Stanford, CA'))
])

Now that both EMP and DEPT have a common key on the dept_id, then we join them:

>>> sorted(EMP.join(DEPT).collect())
[
 (10, ((1000, 'alex'), ('ACCOUNTING', 'NEW YORK, NY'))),
 (10, ((2000, 'ted'), ('ACCOUNTING', 'NEW YORK, NY'))),
 (10, ((5000, 'joe'), ('ACCOUNTING', 'NEW YORK, NY'))),
 (20, ((3000, 'mat'), ('RESEARCH', 'DALLAS, TX'))),
 (20, ((4000, 'max'), ('RESEARCH', 'DALLAS, TX')))
]

How will the map-side join optimize the task? Here, I assume that EMM is a large data set and DEPT is a relatively small dat set. Using map side join, to join EMP with DEPT on dept_id, we will create a brodcast varibale from a small table (this is a custom hash builder from an RDD):

# build a dictionary of (key, value),
# where key = dept_id
#       value = (dept_name , dept_location)
#
def to_hash_table(dept_as_list):
  hast_table = {}
  for d in dept_as_list:
    dept_id = d[0]
    dept_name_location = d[1]
    hash_table[dept_id] = dept_name_location
  return hash_table
#end-def

dept_hash_table = to_hash_table(DEPT.collect())

Alternatively, you may build the same hash table by using an Spark action collectAsMap(), which returns the key-value pairs in this RDD (DEPT) to the master as a dictionary. Therefore, we can write:

dept_hash_table = DEPT.collectAsMap()

Now, using pyspark.SparkContext.broadcast, you can broadcast a read-only variable (dept_hash_table) to the Spark cluster, which will be available to all kinds of transformations (including mappers and reducers).

sc = spark.sparkContext
hash_table_broadcasted = sc.broadcast(dept_hash_table)

Now, how do we perfor the map side join? In your mapper, you can access your dept_hash_table via:

dept_hash_table = hash_table_broadcasted.value

Next, I show the map side join by using a map() transformation:

joined = EMP.map(map_side_join)

Finally, map_side_join() is defined as:

# e as an element of EMP RDD
def map_side_join(e):
  dept_id = e[0]
  # get hash_table from broadcasted object
  hash_table = hash_table_broadcasted.value
  dept_name_location = hash_table[dept_id]
  return (e, dept_name_location)
#end-def

Therefore, with map side join, you can avoid shuffling which can cost a lot and you avoid significant network I/O.

With a map-side join, we just used a map() function for each row of the EMP table, and retrieved dimension values (such as dept_name and dept_location) from the broadcasted hash table. If EMP table is very large, the map() function will be executed concurrently for each partition that has own copy of hash table. The map side join approach allows us not to shuffle the fact table (i.e., DEPT) and to get quite good join performance.

Below, I have listed advantages and disadvantages of using a map-side join.

Advantages of using map-side join
  • The true join cost can be reduced since we are minimizing the cost that is incurred for sorting and merging in the shuffle and reduce stages.

  • Map-side join improves the performance of the join task by decreasing the time to finish the join operation.

Disadvantages of map-side join
  • Map-side join data pattern is proper when one of the RDDs/tables on which you perform map-side join operation is small enough to fit into the memory.

  • If both RDDs/tables are not small at all, then map-side join is not suitable to perform map-side join on the RDDs/tables.

Next, I discuss another design pattern (Join using Bloom filters), which can be used in efficient joining of two tables.

Efficient Joins using Bloom filters

Given two RDDs as A=RDD[(K, V)] (as a bigger RDD) and B=RDD[(K, W)] (as a smaller RDD), Spark enable us to perform join operation on key K. Joining two RDDs is a common operation when working with Spark. In some of the cases, a join is used as a form of filtering. For example, you want to perform an operation on a subset of the records in the RDD[(K, V)], represented by entities in another RDD[(K, W)]. For this you can use an inner join to achieve that effect. But, you want to avoid the shuffle that the join operation introduces, especially if the RDD[(K, W)] you want to use for filtering is significantly smaller than the main RDD[(K, V)] on which you will perform your further computation.

You may do a broadcast join using a set (as a Bloom filter) constructed by collecting the smaller RDD you wish to filter by. But, this means collecting the whole smaller RDD[(K, W)] in driver memory, and even if it is relatively small (several thousand or million records), that can still lead to some undesirable memory pressure. If you want to avoid the shuffle that the join operation introduces, then you may use the Bloom filter. So the problem of joining RDD[(K, V)] with RDD[(K, W)] is reduced into a simple map() transformation, which we will check the key K against the Bloom filter constructed from the smaller RDD[(K, W)].

Bloom filter

A Bloom filter is a space-efficient probabilistic data structure, conceived by B. H. Bloom in 1970, that is used to test whether an element is a member of a set. False positive matches are possible, but false negatives are not; i.e. a query returns either possibly in set or definitely not in set. Elements can be added to the set, but not removed (though this can be addressed with a “counting” filter). The more elements that are added to the set, the larger the probability of false positives. The Bloom filter data structure may return true for elements that are not actually members of the set (this is called false-positives errors), but it will never return false for elements that are in the set; for each element in the set, the Bloom filter must return true.

Therefore, in a nutshell, we can summarize Bloom filter properties as:

  • Given a big set S = { x~1~, x~2~, ..., x~n~}, Bloom filter is a probabilistic, fast, and space efficient cache builder for a big data set; this is basically approximating set membership operation:

    is x in S
  • It tries to answer lookup questions:

    does item x exist in a set S?
  • It allows false positive errors as they only cost us an extra data set access. This means that for some x, which is not in the set, Bloom filter might indicate that x is in the set.

  • It does not allow false negative errors, because they result in wrong answers. This means that if x is not in the set, then for sure it will indicate that x is not in the set. Thus, the Bloom filter does not allow false negatives, but can allow false positives.

Let’s focus on a simple join example between two relations or tables: let’s say that we want to join R=RDD(K, V) and S=RDD(K, W) on a common key K . Further assume that

count(R) = 1000,000,000  (larger data set)
count(S) =   10,000,000  (smaller data set)

To do basic join, we need to check 10 trillion (10^12) records, which is a huge and time consuming process. One idea to reduce time and complexity of join operation between R and S is to use a Bloom filter on relation S (smaller data set) and then use the built Bloom filter data structure on relation R. This can eliminate the non-needed records from R (it might reduce it to 20,000,000) and hence the join becomes fast and efficient.

Next we semi-formalize Bloom filter data structure: what is involved in Bloom filter probabilistic data structures? How do we construct one? what is the probability of false positive errors, and how we can decrease the probability of false positive errors. This is how it works:

  • Given a set S = { x~1~, x~2~, ..., x~n~}

  • Let B be an m ( m > 1 ) bit array, initialized with 0s. B’s elements are B[0], B[1], B[2], …, B[m-1]. The memory required for array B is only a fraction of the one needed for storing the whole set S. By selecting the bigger bit vector (array B), the probability of false positive rate will decrease.

  • Let { H~1~, H~2~, ..., H~k~} be a set of k hash functions, If H~i~(x~j~) = a then set B[a] = 1. You may use SHA1, MD5, and Murmer as hash functions. For example, you may use the following as hash functions:

    • H~i~(x) = MD5(x+i)

    • H~i~(x) = MD5(x || i)

  • To check if x in S, check B at H~i~(x). All k values must be 1.

  • It is possible to have a false positive; all k values are 1, but x is not in S.

  • The probability of false positives is (for details, see Wikipedia).

Big(1 - ig[1 - {1 over m}ig]^{kn}Big)^k approx Big(1 - e^{-kn/m}Big)^k

  • What are the optimal hash functions? What is the optimal number of hash functions? For a given m (number of bits selected for Bloom filter) and n (size of big data set), the value of k (the number of hash functions) that minimizes the probability of false positives is (ln stands for “natural logarithm”)

k = { m over n } ln(2)

m = - {n : ln(p) over {(ln(2))^2}}

Therefore, the probability that a specific bit has been flipped to 1 is:

1 - ig(1 - {1 over m} ig)^{kn} approx 1 - e^{-{{kn} over m}}

Next, let’s take a look at a Bloom filter example.

A Simple Bloom Filter Example

This example shows how to insert and query on a bloom filter of size 10 (m = 10) with three hash functions H = {H~1~, H~2~, H~3~} and let H(x) denote the result of these three hash functions. We start with a 10-bit long array B initilized to 0:

Array B:
   initialized:
         index  0  1  2  3  4  5  6  7  8  9
         value  0  0  0  0  0  0  0  0  0  0

   insert element a,  H(a) = (2, 5, 6)
         index  0  1  2  3  4  5  6  7  8  9
         value  0  0  1  0  0  1  1  0  0  0

   insert element b,  H(b) = (1, 5, 8)
         index  0  1  2  3  4  5  6  7  8  9
         value  0  1  1  0  0  1  1  0  1  0

   query element c
   H(c) = (5, 8, 9) => c is not a member (since B[9]=0)

   query element d
   H(d) = (2, 5, 8) => d is a member (False Positive)

   query element e
   H(e) = (1, 2, 6) => e is a member (False Positive)

   query element f
   H(f) = (2, 5, 6) => f is a member (Positive)

Bloom Filter in Python

The following code segment shows how to create and use a Bloom filter in Python (you may roll your own Bloom filter library, but if you find a proper library, then use it).

# instantiate BloomFilter with custom settings,
>>> from bloom_filter import BloomFilter
>>> bloom = BloomFilter(max_elements=100000, error_rate=0.01)

# Test whether the bloom-filter has seen a key:
>>> "test-key" in bloom
False

# Mark the key as seen
>>> bloom.add("test-key")

# Now check again
>>> "test-key" in bloom
True

Using Bloom Filter in PySpark

Bloom filter is a small, compact, and fast data structure for set membership. It can be used in the join of two RDDs/relations/tables such as R(K, V) and S(K, W) where one of the relations has huge number of records (for example, R to have 1000,000,000 records) and the other relation has small number of records (for example, S to have 10,000,000 records). To do a join on key field “K” between R and S, it will take a long time and it is inefficient.

We can use Bloom filter data structure in the following way: build a Bloom filter out of relation S(K, W), and then test values R(K, V) for membership using the built Bloom filter (using Sparks broadcast mechanism). Note that, for reduce-side join optimization, we use a Bloom filter on the map tasks, which will force an I/O cost reduction for the PySpark job. How do we use the Bloom filter concept in mappers? The following steps show how to use a Bloom filter (representation of S) in mappers (R), which will be substitute for the join operation between R and S.

STEP-1: Build Bloom filter

Construction of the bloom filter; this a samll PySpark code segment, which uses the smaller of the two relations/tables for constructing the bloom filter data structure. First, initialize the Bloom filter (create an instance of BloomFilter), then you may use BloomFilter.add() to build the Bloom filter. Let’s call the built Bloom filter as the_bloom_filter

STEP-2: Broadcast the Built Bloom filter

Use SparkContext.broadcast() to broadcast the built Bloom filter (the_bloom_filter) to all worker nodes (so that it can be available to all Spark transformations including mappers)

# to broadcast it to all worker nodes for read only purpses
sc = spark.sparkContext
broadcasted_bloom_filter = sc.broadcast(the_bloom_filter)`
STEP-3: Use the broadcasted object in Mappers

Now, we can user the Bloom filter to get rid of the non-needed elements.

# e is an element of R(k, b)
def bloom_filter_function(e):
  # get a copy of Bloom filter
  the_bloom_filter = broadcasted_bloom_filter.value()
  # use the_bloom_filter for element e
  key = e[0]
  if key in the_bloom_filter:
    return True
  else:
    return False
#end-def

We use the filter_function() for R=RDD[(K, V)] to keep the elements if and only if the key is in S=RDD[(K, W)].

# R=RDD[(K, V)]
# joined = RDD[(K, V)] where K is in S=RDD[(K, W)]
joined = R.filter(bloom_filter_function)

Summary

MapReduce design patterns are common patterns in data-related problem solving. These design patterns enable us to solve similar data problems in an efficient manner

Some of MapReduce design patterns are:

  • Summarization patterns: get a top-level view by summarizing and grouping data

  • Filtering patterns: view data subsets by using predicates

  • Data organization patterns: reorganize data to work with other systems, or to make MapReduce analysis easier

  • Join patterns: analyze different datasets together to discover interesting relationships

  • Meta patterns: piece together several patterns to solve multi-stage problems, or to perform several analytics in the same job

  • Input and output patterns: customize the way you use persistent store (such as HDFS and S3) to load or store data

Some of Summarization Patterns are listed:

  • Word Count

  • MinMax Count

  • Summarization

  • Map-side Join

  • Join with Bloom filter

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.184.237