The goal of this chapter is to itroduce some informal but practical data design patterns. I will only discuss the design patterns that are useful in production environments and will not look at the theoretical design patterns (which are just theories and not used in big data solutions), which are not deployed in production environments.
This chapter
Introduces the basic concepts of design patterns in the context of big data
Provides some useful and practical design patterns by examples
Shows how to use Spark’s transformations for implementing data design patterns
Introduces the concept of “monoids” for better understandings of reduction transformations
The best design patterns book is the iconic computer science book “Design Patterns: “Elements of Reusable Object-Oriented Software” by four authors: Erich Gamma, Richard Helm, Ralph Johnson, and John Vlissides (known as The “Gang of Four”). I will not present the data design patterns similar to the “Gang of Four” book, but will focus on practical data design patterns. My presentations are informal, practiacl, and have benn used in production environments.
What is a design pattern? According to wikipedia: “in software engineering, a design pattern is a general reusable solution to a commonly occurring problem within a given context in software design. A design pattern is not a finished design that can be transformed directly into source or machine code.”
Spark, MapReduce, and distributed programming design patterns are common patterns in data related problem solving. MapReduce is a programming model (for example, Hadoop is a concrete implementation of MapReduce) and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. A typical MapReduce job consists of a driver and 3 functions:
map()
: a mapper, which supports an equivalent
of Spark’s map()
, flatMap()
, and filter()
reduce()
: a reducer , which supports
an equivalent of Spark’s reduceByKey()
and groupByKey()
combine()
as an optional local reducer
For example, in software engineering, the
“singleton” pattern is a design pattern that
restricts the instantiation of a class to one
object. In MapReduce, InMapper Combiner
is a design pattern, which does most of the
combine()
functionality (in order to avoid
generating too many intermediate (key, value)
pairs) inside the mappers. A Combiner
transformation, also known as a semi-reducer,
is an optional operation in MapReduce paradigm
that operates by accepting the inputs from
the mappers and thereafter passing the output
key-value pairs to the reducers. The main
function of a Combiner is to summarize the
mappers outputs with the same key. Another
benefit of design patterns is that it makes
the objective of code easier to understand
and provides known scalability issues (if
any), performance profiles and limitations
of solutions.
To be practical, I will only focus on some of the practical data design patters, which can help us on developing big data solutions, which can be deployed to production environments without scalability problems. The data design patters are patterns, which can help us to write scalable solutions to be deployed on Spark clusters. Overall, there is no silver bullet on adopting and using design patterns and data design patterns, every adopted pattern should be tested (in similar production environments) in detail for performance and scalability by using real data.
For learning design patterns in software engineering, you should look at the “Gang of Four” book (Design Patterns: Elements of Reusable Object-Oriented Software by Erich Gamma, et al). For learning design patterns in MapReduce, you should look at the “MapReduce Design Patterns” book by Donald Miner and Adam Shook and my book “Data Algorithms” (published by O’Reilly).
Below are the partial list of data design patterns and I will cover some of them in this chapter.
MinMax
Top-10
Secondary Sorting
Stripes
InMapper Combining
Join patterns (inner-join, left-join, …)
Summarization Patterns
Filtering Patterns
Data Organization Patterns
Duplicate Detection
Next, I discuss the InMapper Combining data design pattern by some simple examples.
InMappper Combining refers to a situation
for a mapper to combine and summarize the
mappers outputs as much as possible and emit
less intermediate (key, value) pairs for the
sort and shuffle and reducers (such as
reduceByKey()
, groupByKey()
, …). For
example, for a classic word count problem,
given an input record such as:
"fox jumped and fox jumped again fox"
Without using InMappper Combiner, we will generate
the following (key, value)
pairs (before handing
it to shufflers and then reducers):
(fox, 1) (jumped, 1) (and, 1) (fox, 1) (jumped, 1) (again, 1) (fox, 1)
The problem is that for big data, we might
generate too many (word, 1)
pairs, which
might be keeping the cluster network too busy
and may be less efficient. Using the InMapper
Combiner data design pattern, we will combine
(summarize the mappers outputs) the (key, value)
pairs to generate less (key, value)
pairs.
For example since there are 3 of (fox, 1)
,
then that will be combined into (fox, 3)
.
(fox, 3) (jumped, 2) (and, 1) (again, 1)
The basic concept of the “InMapper Combiner” data design pattern is illustrated by Figure 19.1.
Therefore, in the mapper transformation, using InMapper combiner, the 3 pairs of (fox, 1) is replaced by (fox, 3). Without using InMapper combiner, for the word count, we generated seven (key, value) pairs, but by using the InMapper combiner we generated only four (key, value) pairs. If we have a lot of repeated words in our data, then the InMapper Combiner might help us to achieve better performance by generating less intermediate (key, value) pairs.
To demonstrate the InMapper Combining data Design Pattern concepts, we solve counting the frequencies of characters for a set of documents. In simple terms, we want to find out the frequency of each unique character for a given corpus. We will discuss the following solutions:
Basic MapReduce Design Pattern
InMapper Combining Per Record
InMapper Combining Per Partition
To count the characters, for each record
of input, we split it into a set of words,
and then we split each word into a character
array, and finally we emit a (key, value)
pairs, where key
is a single character
from a character array and value
is 1
(frequency count of one character). Since
we did not use any custom data types for
emitting key-value pairs, we call this the
“Basic” MapReduce design pattern. The reducer
sums up the frequencies of a single unique
character. The problem with this solution
is that for large data, we emit too many
(key, value) pairs, which can impact the
network traffic and hence can impact the
performance of the overall solution. Also,
because of too many (key, value) emitted
pairs, the “sort and shuffle” (grouping
values for the same key) phase might take
longer time than expected.
Given an RDD of String (as RDD[String]
),
PySpark solution is provided below:
First we define a simple function, which accepts a single string records and then returns a list of (key, value) pairs, where key is a character and value is 1 (frequency of a character).
def
mapper
(
rec
)
:
words
=
rec
.
lower
(
)
.
split
(
)
pairs
=
[
]
for
word
in
words
:
for
c
in
word
:
pairs
.
append
(
(
c
,
1
)
)
#end-for
#end-for
return
pairs
#end-def
Tokenize a record into an array of words
Create an empty list as pairs
Iterate over words
Iterate over a single word
Add each character (c
) as (c, 1)
to pairs
Return list of (c, 1)
for all characters in a given record
The mapper()
function can be simplified as:
def
mapper
(
rec
):
words
=
rec
.
lower
()
.
split
()
pairs
=
[(
c
,
1
)
for
word
in
words
for
c
in
word
]
return
pairs
#end-def
Next, we use the mapper() function to count frequencies of unique characters:
# spark : an instance of SparkSession
rdd
=
spark
.
sparkContext
.
textFile
(
"
/dir/input
"
)
pairs
=
rdd
.
flatMap
(
mapper
)
frequencies
=
pairs
.
reduceByKey
(
lambda
a
,
b
:
a
+
b
)
Create an RDD[String]
from input data
Map each record into collection of characters
and flatten it as a new RDD[Character, 1]
Find frequencies of each unique character
Therefore, InMapper Combining data design pattern
reduces the number of (key, value)
pairs emitted
by mappers, which indeed this reduces the network
traffic and improves the performance of execution
time.
This section introduces InMapper Combining Per
Record design pattern, which is also known as
Local Aggregation Per Record solution. This is
very similar to the basic Spark’s MapReduce
Algorithm with the exception that for a given
input record, we aggregate frequencies for a
single character before emitting (key, value)
pairs. Since the same character may be repeated
many times in a given input record, in this
solution we emit (key, value) pairs where key
is a unique single character within a given
record and value is an aggregated frequency
for the same character in a given mapper input
record. Then we use reduceByKey()
to
aggregate all frequencies for a unique single
character. This solution uses “local
aggregation” by leveraging the associativity
and commutativity of the reduce()
function
to combine values before reducers fetch them
across the network. Typically, in MapReduce
frameworks (such as Hadoop), combiners are
viewed by the runtime as an optional local
optimizations. Note that the correctness
of the algorithm cannot depend on the
combiner (instead we use “force in-mapper
combining”). For example, for the following
input record:
foxy fox jumped over fence
we will emit the following (key, value)
pairs:
(o, 3) (m, 1) (v, 1) (x, 2) (p, 1) (r, 1) (y, 1) (e, 4) (n, 1) (j, 1) (d, 1) (c, 1)
Given an RDD of String (RDD[String]
), PySpark
solution is provided below:
First we define a simple function, which accepts
a single string record (an element of input
RDD[String]
) and then returns a list of
(key, value)
pairs, where key is a unique
character and value is an aggregated frequency
of that character.
import
collections
def
local_aggregator
(
record
)
:
hashmap
=
collections
.
defaultdict
(
int
)
words
=
record
.
lower
(
)
.
split
(
)
for
word
in
words
:
for
c
in
word
:
hashmap
[
c
]
+
=
1
#end-for
#end-for
(
"
hashmap=
"
,
hashmap
)
#
pairs
=
[
(
k
,
v
)
for
k
,
v
in
hashmap
.
iteritems
(
)
]
(
"
pairs=
"
,
pairs
)
return
pairs
#end-def
The collections
module provides high-performance
container datatypes
Create an empty dictionary[String, Integer],
defaultdict
is a dict subclass that calls a
factory function to supply missing values
Tokenize record into an array of words
Iterate over words
Iterate over each word
Aggregate characters
Flatten dictionary into a list of (character, frequency)
Return the flattened list of (character, frequency)
Next, we use the local_aggregator() function to count frequencies of unique characters:
input_path
=
'/tmp/your_input_data.txt'
rdd
=
spark
.
sparkContext
.
textFile
(
input_path
)
pairs
=
rdd
.
flatMap
(
local_aggregator
)
frequencies
=
pairs
.
reduceByKey
(
lambda
a
,
b
:
a
+
b
)
For sure, this solution will emit much less
(key, value) pairs, which is an improvement
over the previous solution. The problem with
this solution is that we instantiate and use
a dictionary per mapper. If we have so many
mappers, this might create an OOM problem.
But if the number of mappers are not too many,
then this solution will scale out. Another
improvement of this solution is in the “sort
and shuffle” phase: since we do not have too
many (key, value)
pairs before reduction,
then the sort and shuffle will execute faster
than the previous solution.
Next, I present InMapper Combiner Per Partition,
which is the most efficient among all InMapper
combiners. This is because we use the least
amount of memory to summaraize and combine
(key, value)
pairs per partition (can be
comprised of millions of data elements).
InMapper Combiner Per Partition solution emits
frequencies of characters per partition (rather
than per record) of input data. Each partition
of input is a set of input records rather than
a single input record. Therefore, there is
another way to emit the frequency of characters:
build a hash table of dict[Character,Integer]
from the characters of given “input partition”
(as opposed to “input record" — an “input
partition” is a partition of input comprised
of a set of “input records”) and then emit the
(key, value)
pairs, where key is
dict.Entry.getKey()
and value is
dict.Entry.getValue()
. Therefore,
the mapper phase will emit a set of (key, value)
pairs, where key is an entry of built hash table.
In this of solution, we use a single
hash table per input partition (rather than
per input record) to keep track of frequencies
of all characters for a given partition. After
the mapper (using the PySpark’s mapPartitions()
transformation) completes the given partition,
then we emit all key-value pairs from the
frequencies table (the built hash table). Then
the reducers will sum up the frequencies and
find the final count of characters. Note that
the mapper using Hash Table (called the
“InMapper Combiner” design pattern) is more
efficient than the presented previous solutions
because it will not emit too many key-value pairs,
which improve efficiency in network by sending
less data. The solution will scale out better
than the previous two solutions: we use only
a single hash table per input partitioner
rather than using a hash table per input record.
This eliminates possible OOM problems. If even we
partition our input into thousands of partitions,
this solution scales out very well.
It is clear that the “Basic” MapReduce algorithm
generates huge number of key-value pairs compared
to the “InMapper Combining” design pattern. The
“InMapper Combining” data structure representation
is much more compact, since every entry of
dict[Character, Integer]
is equivalent to N
basic key-value pairs, where N is equal to
dict.Entry.getValue()
. The “InMapper Combining”
approach also generates fewer and shorter
intermediate keys, and therefore the execution
framework has less sorting to perform. In using
the “InMapper Combining” design pattern, you
need to be careful on the size of hash table to
make sure it will not cause bottlenecks. For
Character Count problem, the size of hash table
for each mapper (per input partition) will be
very small (since we have a limited number of
unique characters), therefore there is no
performance bottleneck for the presented
solution.
For example, for the following “input partition” (as opposed to a single record):
foxy fox jumped over fence foxy fox jumped foxy fox
we will emit the following (key-value) pairs:
(f, 7) (u, 2) (v, 1) (j, 2) (y, 3) (o, 7) (m, 1) (r, 1) (d, 2) (e, 5) (x, 6) (p, 2) (n, 1) (c, 1)
Given an RDD of String (RDD[String]
),
PySpark solution is provided below:
First we define a simple function, which accepts a single input partition (comprised of many input records) and then returns a list of (key, value) pairs, where key is a character and value is an aggregated frequency of that character.
def
inmapper_combiner
(
partition_iterator
)
:
hashmap
=
defaultdict
(
int
)
for
record
in
partition_iterator
:
words
=
record
.
lower
(
)
.
split
(
)
for
word
in
words
:
for
c
in
word
:
hashmap
[
c
]
+
=
1
#end-for
#end-for
#end-for
(
"
hashmap=
"
,
hashmap
)
#
pairs
=
[
(
k
,
v
)
for
k
,
v
in
hashmap
.
iteritems
(
)
]
(
"
pairs=
"
,
pairs
)
return
pairs
#end-def
partition_iterator represents a single input partition comprised of a set of records
Create an empty dictionary[String, Integer]
Get a single record from a partition
Tokenize record into an array of words
Iterate over words
Iterate over each word
Aggregate characters
Flatten dictionary into a list of (character, frequency)
Return the flattened list of (character, frequency)
Next, we use the inmapper_combiner()
function
to count frequencies of unique characters:
rdd
=
spark
.
sparkContext
.
textFile
(
"
/.../input
"
)
pairs
=
rdd
.
mapPartitions
(
inmapper_combiner
)
frequencies
=
pairs
.
reduceByKey
(
lambda
a
,
b
:
a
+
b
)
Create an RDD[String] from input file(s)
The `mapPartitions() transformation returns a new RDD by applying a function to each input partition (as opposed to a single input record) of this RDD
For sure, this solution will emit much less (key, value) pairs, which is an improvement over previous two solutions. This solution is very efficient since we instantiate and use a single dictionary per input partition (rather than per input record). If even we have so many mappers, this will not create an OOM problem. No matter the number of input partitions, this solution will scale out. Another improvement of this solution is in the “sort and shuffle” phase: since we do not have too many (key, value) pairs before reduction, then the sort and shuffle will execute much faster than the previous two solutions. The “InMapper Combining” design pattern reduces the amount of data that needs to be transferred between the mapper and reducer. However, the “InMapper Combining” algorithm may runs into a problem if the number of unique keys grows too large for the associative array to fit in memory. If this is the case you will have to revert to the basic Pairs design pattern approach. Note that with “InMapper Combining”, the mappers will generate only those key-value pairs that need to be shuffled across the network to the reducers.
In implementing the “InMapper Combining” design
pattern, we used Spark’s powerful mapPartitions()
transformation to transform each input partition
into a single dict[Character, Integer]
and then
we aggregate/merge these into a single final
dict[Character, Integer]
. For “char counting”,
this algorithm is efficient and faster than other
algorithms. When you are to create or aggregate
small amount of information from large chunk of
data (such as finding frequencies of characters — since the number of ASCII characters are very
limited), then using “InMapper Combining” design
pattern makes sense and a best option to implement
this design pattern is the Spark’s mapPartitions()
transformation.
We should also consider the scalability bottlenecks for the “InMapper Combining” design pattern. Using the “InMapper Combining” design pattern, we make the assumption that, at any point in time, each associative array per mapper partition (i.e., dic[Character, Integer]) is small enough to fit into memory — otherwise, memory paging will significantly impact performance. For character count, the size of the associative array (per mapper partition) is bounded by the number of unique characters. Therefore, for character count problem, there is no scalability bottlenecks by using the “InMapper Combining” design pattern.
But what are the advantages and disadvantages of the InMapper Combining design pattern? The InMapper Combining has the following advantages and disadvantages:
Advantages
Create less (key, value)
pairs by mappers
Far less sorting and shuffling of (key, value)
pairs
Can make better use of combiners as optimizers
Scale out solution
Disadvantages
More difficult to implement (you need some custom functions for handling each partition)
Underlying object (per mapper partition) is more heavyweight
Fundamental limitation in terms of size of the underlying object (for the character count problem: an associative array per mapper partition)
The InMapper Combining design pattern enable
us to reduce the number of (key, value)
pairs emitted by mappers. Hence, this can
improve the performance of your entire data
transformations.
The Top-10 design-pattern is illustrated by the Figure 19.3.
Creating a Top-10 list is a common task in many data intensive operations. For example, we might ask the following questions:
What are the top-10 list of the URLs visited last day/week/month?
What are the top-10 list of items purchased from Amazon last week/month?
What are the top-10 list of search queries from Google in the last day/week/month?
What are the top-10 list of liked items from Facebook yesterday?
What are the top-10 list of cartoons of all time?
For example, for a search engine (such as Google),
it is desirable to know the Top-10 visited URLs
per day, week, or month. If we have a table of
two columns: (url
, frequency
), then
finding Top-10 URLs in SQL is straightforward:
SELECT
url
,
frequencey
FROM
url_table
ORDER
BY
frequencey
DESC
LIMIT
10
;
Also, finding Top-N (where N > 0
) records in Spark
is very easy: let R be an RDD of (String, Integer)
(pair of String and Integer), where key is a String
(representing a URL) and value is the frequency of URL.
Then we may use RDD.takeOrdered(N)
to find the Top-N
lists. The general format of RDD.takeOrdered()
is
given below:
takeOrdered(N, key=None) Description: Get the N elements from an RDD ordered in ascending order or as specified by the optional key function.
Assuming that N is an integer greater
than 0, using RDD.takeOrdered()
solves
the Top-N list as:
N
=
10
# Sort by keys (ascending):
RDD
.
takeOrdered
(
N
,
key
=
lambda
x
:
x
[
0
])
# Sort by keys (descending):
RDD
.
takeOrdered
(
N
,
key
=
lambda
x
:
-
x
[
0
])
# Sort by values (ascending):
RDD
.
takeOrdered
(
N
,
key
=
lambda
x
:
x
[
1
])
# Sort by values (descending):
RDD
.
takeOrdered
(
N
,
key
=
lambda
x
:
-
x
[
1
])
For sake of argument assume that takeOrdered()
does not have an optimal performance for a very
large data set. Then what other options do we
have? We might use the Top-10 design pattern for
MapReduce paradigm. How does the Top-10 design
pattern work for MapReduce paradigm? We’ll
discuss that throughout the rest of this section.
Given a large set of { (key-as-string, value-as-integer) }
pairs, then finding a Top-N ( where N > 0) list is a
“design pattern”, which is an independent reusable
solution to a common Top-10 problem, which enable us
to produce reusable code. For example, let key-as-string
be a URL and value-as-integer
be the number of times
that URL is visited, then you might ask: what are the
Top-10 URLs for a given data (means find the 10 URLs
with the highest frequencies)? This kind of a question
is common for this type of (key, value) pairs. Finding
“Top-10 list” falls into “Filtering Pattern”: you filter
out data and find Top-10 list. Also note that Top-10
function is a function which is commutative and
associative and therefore using partitioners,
combiners, and reducers will always produce correct
results. Let T
be a Top-10 function, and let a
, b
,
and c
be a set of values (such as frequencies) for
the same key, then we can write:
Commutative
T(a, b) = T(b, a)
Associative
T(a, T(B, c)) = T(T(a, b), c)
This section provides a complete PySpark solution
for Top-10 design pattern. Given a pair RDD
of
(String, Integer)
, the goal is to find Top-10 list
for the given RDD. In our solution, we assume that
all keys are unique. If the keys are not unique,
then you may use the reduceByKey()
(before finding
Top-10) to make all keys unique.
Our MapReduce solutions will generalize the “top 10 list” and will be able to find “top-N list” (for N > 0). For example, we will be able to find “top 10 cats”, “top 50 most visited web sites”, or “top 100 search queries of a search engine”.
Let N be an integer number and N > 0
. Let L
be a
list of pairs of (T, Integer)
, where T
can be any type
(such as String, URL, …), L.size() = s
, s > N
,
and elements of L
be:
where K~i~
has a data type of T
and V~i~
is an
Integer type (this is the frequency of K~i~
). Let
{sort(L)}
be a sorted list where sort is done by
using frequency as a key:
where (A~j~, B~j~)
in L
. Then top-N of
list L
is defined as:
For Top-N solution, we will use Python’s SortedDict
.
Before providing Top-N solution, let’s understand
SortedDict
. Python’s sorted dict (SortedDict
)
is a sorted mutable mapping. Sorted dict keys are
maintained in sorted order. The design of sorted
dict is simple: sorted dict inherits from dict to
store items and maintains a sorted list of keys.
Sorted dict keys must be hashable and comparable.
The hash and total ordering of keys must not change
while they are stored in the sorted dict.
To implement Top-N, we need a hash table data structure
such as sortedcontainers.SortedDict()
that we can have
a total order on its keys (keys represent frequencies).
The easy way to build a Top-N list in Python is to use
SortedDict()
, a dictionary that further provides a
total ordering on its keys. The dictionary is ordered
according to the natural ordering of its keys. We will
keep adding (frequency, url)
to the SortedDict()
,
but keep its size at N (note that frequency
is the key
since SortedDict()
entries are sorted by its key).
When the size is N+1, we will pop out the smallest
frequency by SortedDict.popitem(0)
.
Example and usage of SortedDict()
is given below:
>>>
from
sortedcontainers
import
SortedDict
>>>
sd
=
SortedDict
({
10
:
'a'
,
2
:
'm'
,
3
:
'z'
,
5
:
'b'
,
6
:
't'
,
100
:
'd'
,
20
:
's'
})
>>>
sd
SortedDict
({
2
:
'm'
,
3
:
'z'
,
5
:
'b'
,
6
:
't'
,
10
:
'a'
,
20
:
's'
,
100
:
'd'
})
>>>
sd
.
popitem
(
0
)
(
2
,
'm'
)
>>>
sd
SortedDict
({
3
:
'z'
,
5
:
'b'
,
6
:
't'
,
10
:
'a'
,
20
:
's'
,
100
:
'd'
})
>>>
sd
[
50
]
=
'g'
>>>
sd
SortedDict
({
3
:
'z'
,
5
:
'b'
,
6
:
't'
,
10
:
'a'
,
20
:
's'
,
50
:
'g'
,
100
:
'd'
})
>>>
sd
.
popitem
(
0
)
(
3
,
'z'
)
>>>
sd
SortedDict
({
5
:
'b'
,
6
:
't'
,
10
:
'a'
,
20
:
's'
,
50
:
'g'
,
100
:
'd'
})
>>>
sd
[
9
]
=
'h'
>>>
sd
SortedDict
({
5
:
'b'
,
6
:
't'
,
9
:
'h'
,
10
:
'a'
,
20
:
's'
,
50
:
'g'
,
100
:
'd'
})
>>>
sd
.
popitem
(
0
)
(
5
,
'b'
)
>>>
sd
SortedDict
({
6
:
't'
,
9
:
'h'
,
10
:
'a'
,
20
:
's'
,
50
:
'g'
,
100
:
'd'
})
>>>
>>>
len
(
sd
)
6
Next, I present a Top-10 solution using PySpark.
The PySpark solution is pretty straightforward:
we partition the RDD into M
partitions (where
M > 1
) by using the mapPartitions()
transformation. Each mapper will find a local
“top-N” list (for N > 0) and then will pass it
to a single reducer. Then the single reducer
will find the final “top-N” list from all local
“top-N” list passed from mappers. In general,
in most of the MapReduce algorithms, having a
single reducer is problematic and will cause a
performance bottleneck (the reason for
bottleneck is that one reducer in one server
receives all data — which can be very big
data volume — and all other cluster nodes
do nothing, so the entire pressure and load
will be on a single node, which can cause
performance bottlenecks). Here, our single
reducer will not cause a performance problem.
Here is how: let’s assume that we will have
5,000 partitions, then each mapper will only
generate 10 (key, value) pairs. Therefore,
our single reducer will only get 50,000
records (which is not that much data at all
to cause performance bottleneck!).
Our high level solution for Top-10 Design Pattern is illustrated by Figure 19.3.
The Top-10 algorithm is presented below. Input is partitioned into smaller chunks and each chunk is sent to a mapper. Each mapper creates a local top-10 list and then emits the local top-10 to be sent to reducers. In emitting mappers output, we use a single reducer key so that all mappers output will be consumed by a single reducer.
Let spark
be an instance of a SparkSession
.
This is how the Top-10 is solved by using the
mapPartitions()
transformation and a custom
Python function named top10_design_pattern()
:
pairs
=
[(
'url-1'
,
10
),
(
'url-2'
,
8
),
(
'url-3'
,
4
),
...
]
rdd
=
spark
.
sparkContext
.
parallelize
(
pairs
)
# mapPartitions(f, preservesPartitioning=False)
# Return a new RDD by applying a function
# to each partition of this RDD.
top10
=
rdd
.
mapPartitions
(
top10_design_pattern
)
To complete the implementation, we present the
top10_design_pattern()
function, which finds
top-10 for each partition (each partition is
a set of (key, value) pairs):
from
sortedcontainers
import
SortedDict
#
def
top10_design_pattern
(
partition_iterator
)
:
sd
=
SortedDict
(
)
for
url
,
frequency
in
partition_iterator
:
sd
[
frequency
]
=
url
if
(
len
(
sd
)
>
10
)
:
sd
.
popitem
(
0
)
#end-for
(
"
local sd=
"
,
sd
)
#
pairs
=
[
(
k
,
v
)
for
k
,
v
in
sd
.
items
(
)
]
(
"
top 10 pairs=
"
,
pairs
)
return
pairs
#end-def
The partition_iterator
is an iterator for
a single partition; an iterator over a set
of (URL, frequency)
Create an empty SortedDict
of (Integer, String)
Iterate over a set of (URL, frequency)
Put (frequency, URL) into SortedDict
Keep/limit the size of sorted dictionary to 10 (remove the lowest frequency)
Convert SortedDict
(which is a local top-10)
into a list of (k, v)
Return a local top-10 list for a single partition
This is how it works: input is divided into a number
of partitions. The number of partitions can be 8, 20,
100, 4000, … — depends on the size of cluster and
number of available memory, disks, CPU, and other
resources. Note that a programmer can set the number
of partitions explicitly as well. Each partition is
a set of (URL, frequency)
pair elements. Each mapper
accepts a partition of elements where each element
is a pair of (URL, frequency)
. After mapper finishes
creating a top-10 list as SortedDict[Integer, String]
,
then the function returns the local top-10 list. Note
that we use a single dictionary (such as a SortedDict
)
per partition (and not per element of source RDD).
The PySpark implementation is presented below.
Note that this solution assumes that all URLs
(i.e., keys) are unique. If The URLs are not
unique, then you may use the reduceByKey()
transformation to make all keys unique.
# cat /tmp/urls.txt url-0001,12 url-0002,22 url-0003,1000 url-0004,199 ...
In previous sections, se showed that how to find the “top-10” list. To find the “bottom-10”, we just need to change one line of code:
Replace the following # find top-10 if (len(sd) > 10): sd.popitem(0) With # find bottom-10 if (len(sd) > 10): sd.popitem(-1)
if size of SortedDict
is larger than 10
then remove the lowest frequency URL from the dictionary
if size of SortedDict
is larger than 10
then remove the highest frequency URL from the dictionary
Next let’s discuss how to partition your input and RDDs. Partitioning RDD is a combination of art and science. What is the right number of partitions for your cluster? There is no magic silver bullet formula for calculating the proper number of partitions. This does depend on the number of cluster nodes, the number of cores per server, and the size of RAM available. My experience indicates that you need to set this by trial and experience. The general rule of thumb is to use the following per executor.
2 * num_executors * cores_per_executor
Also, when you want to create your first RDD, you can set the number of partitions as:
input_path
=
"/data/my_input_path"
desired_num_of_partitions
=
16
rdd
=
spark
.
sparkContext
.
textFile
(
input_path
,
desired_num_of_partitions
)
For example the preceding example creates an RDD[String] with 16 partitions. If you do not set the number of partitions explicitly, then the Spark cluster manager will set it to a default number (based on the resources available from the cluster).
If your RDD is already created with some partitions,
then you may reset the new number of partitions by
using the coalesce()
function: let rdd be an
RDD[T]
:
# rdd : RDD[T] desired_number_of_partitions = 40 rdd2 = rdd.coalesce(desired_number_of_partitions)
The newly created rdd2
(as RDD[T]) will have 40
partitions. According to Spark documentation, the
coalesce()
function is defined as:
pyspark.RDD.coalesce: coalesce(numPartitions, shuffle=False) Description: Return a new RDD that is reduced into numPartitions partitions.
Next, I introduce the MinMax
design pattern,
which the goal is to find (minimum, maximum)
of large data set of numbers.
Given a set of billions of numbers, the goal
is to find the minimum, maximum, and count of
all of the given numbers. MinMax is a “Numerical
Summarizations” design pattern. This pattern
can be used in the scenarios where the data
you are dealing with or you want to aggregate
is of numerical type and the data can be grouped
by specific fields. To understand the concept of
MinMax
design pattern, I am going to present
three different solutions with quite different
performances.
The naive approach will be to emit the following (key, value) pairs:
("min", number) ("max", number) ("count", 1)
for all of the numbers in the given input data set,
then sort and shuffle will group all values by three
keys: “min”, “max”, and “count”, and finally, we may
use a reducer to iterate all numbers and find “min”,
“max”, and “count” for all numbers. The problem with
this approach is that we have to move billions of
(key, value)
pairs in the network and then create
three huge Iterable<Long>
(assuming the data type of
numbers is Long data type). This is a possible solution,
which might not scale out and might have serious
performance problems. In the reduction phase, this
solution will not utilize the whole cluster due to
having only three unique keys: “min”, “max”, and
“count”.
The next solution will sort all numbers and then you get the top (for “max”) and bottom (for “min”) and “count” of the data set. If the performance is acceptable, then it is a good solution. But if you have a lot of numbers, then sorting time might not be acceptable.
mapPartitions()
The final solution (the most efficient from
performance and scalability point of view)
splits data into N
chunks (partitions),
and then uses Spark’s mapPartitions()
transformation to emit three (key, value)
pairs from a single partition:
("min", the-minimum-number-in-a-partition) ("max", the-maximum-number-in-a-partition) ("count", count-of-numbers-in-a-partition)
Finally, we find the min, max, and count from
all partitions. This solution scales out very
well. No matter how many partitions we have, this will
work and will not create OOM error. For example,
if you have 500
billion numbers (assume one or more
numbers per record), then partition it by 100,000
(in worst case, each partition will have 5 million
records — one number per record), which means that
each partition will get about 5
million numbers.
Then each partition will emit three pairs as
illustrated above. Finally, you find min, max, and
count of 100,000 x 3 pairs = 300,000
numbers.
Finding min, max, and count for 300,000
numbers is
very trivial and will not cause any scalability
problem at all.
The high-level solution is illustrated by the Figure 19.5.
We assume that our input records have the following format:
<number><,><number><,><number>...
Sample records are presented below:
10,345,24567,2,100,345,9000,765 2,34567,23,13,45678,900 ...
Here, we present the PySpark solution for
solving the MinMax problem. For details see
the minmax_use_mappartitions.py
program.
input_path
=
<
your
-
input
-
path
>
rdd
=
spark
.
sparkContext
.
textFile
(
input_path
)
min_max_count
=
rdd
.
mapPartitions
(
minmax
)
min_max_count_list
=
min_max_count
.
collect
(
)
final_min_max_count
=
find_min_max_count
(
min_max_count_list
)
Return a new RDD from the given input
Return an RDD of (min
, max
, count
) from
each partition by applying the minmax
function
Collect (min, max, count)
from all partitions as a list
Find the (final_min
, final_max
, final_count
)
by calling the find_min_max_count()
Next, minmax()
function is presented:
def
minmax
(
iterator
)
:
#
(
"
type(iterator)=
"
,
type
(
iterator
)
)
# ('type(iterator)=', <type 'itertools.chain'>)
#
first_time
=
False
#
for
record
in
iterator
:
numbers
=
[
int
(
n
)
for
n
in
record
.
split
(
"
,
"
)
]
if
(
first_time
==
False
)
:
# initialize count, min, max to the 1st record values
local_min
=
min
(
numbers
)
local_max
=
max
(
numbers
)
local_count
=
len
(
numbers
)
first_time
=
True
else
:
# update count, min, and max
local_count
+
=
len
(
numbers
)
local_max
=
max
(
max
(
numbers
)
,
local_max
)
local_min
=
min
(
min
(
numbers
)
,
local_min
)
#end-for
return
[
(
local_min
,
local_max
,
local_count
)
]
#end-def
The iterator
is a type of itertools.chain
Print the type of iterator (for debugging only)
Define some variables for return
Iterate the iterator (record holds a single record)
Tokenize input and build an array of numbers
If its the first record, then find out min
, max
,
and count
If not the first record, then update local_min
,
local_max
, and local_count
Finally return a triplet from each partition
What if some of the partitions are empty? Is it possible for a partition to be empty? Yes, there are many reasons for that (according to Spark documentation). What is an empty partition? An empty partition is a partition with no data at all. Therefore, we have to handle empty partitions gracefully (in case there are empty partitions).
Here I will show how to handle empty partitions.
Error handling in Python is done through the use
of exceptions that are caught in try
blocks and
handled in except
blocks. Try and Except in Python
is defined as: If an error is encountered, a try
block code execution is stopped and transferred down
to the except
block.
def
minmax
(
iterator
)
:
#
(
"
type(iterator)=
"
,
type
(
iterator
)
)
# ('type(iterator)=', <type 'itertools.chain'>)
#
try
:
first_record
=
next
(
iterator
)
except
StopIteration
:
return
[
(
1
,
-
1
,
0
)
]
# WHERE min > max to filter out later
#
# initialize count, min, max to the 1st record values
numbers
=
[
int
(
n
)
for
n
in
first_record
.
split
(
"
,
"
)
]
local_min
=
min
(
numbers
)
local_max
=
max
(
numbers
)
local_count
=
len
(
numbers
)
#
for
record
in
iterator
:
numbers
=
[
int
(
n
)
for
n
in
record
.
split
(
"
,
"
)
]
# update min, max, count
local_count
+
=
len
(
numbers
)
local_max
=
max
(
max
(
numbers
)
,
local_max
)
local_min
=
min
(
min
(
numbers
)
,
local_min
)
# end-for
return
[
(
local_min
,
local_max
,
local_count
)
]
The iterator
is a type of itertools.chain
Print the type of iterator (for debugging only)
Try to get the first record from a given iterator, if successful, then the first_record is initialized to the first record of a partition
If you are here, then it means that the partition is empty, return a fake triplet
Set min
, max
, and count
from the first record
Iterate the iterator
for 2nd, 3rd, … records
(record holds a single record)
Finally return a triplet from each partition
How should we test handling empty partitions?
The program minmax_force_empty_partitions.py
(source code of Chapter 12 in GitHub) forces to
create empty partitions and hadles empty
partitions gracefully. How do you force to create
empty partitions? Set the “number of partitions”
high enough such as greater than the number of
input records. For example, if your input has N
records, then set the number of partitions to N+3
,
which the partitioner might create up to 3
empty
partitions.
This section explores the concept of the “composite pattern” and Monoids and will show how to use “composite pattern” and Monoids in the context of Spark and PySpark.
What is the “composite pattern”? In a nutshell, the composite pattern is a structural design pattern (also called a partitioning design pattern) that can be used when a group of objects can be treated the same as a single object in that group. You may use the composite pattern to create hierarchies of objects and groups of objects, quickly turning the structure into a tree with leaves (objects) and composites (sub-groups). The composite pattern in UML notation is illustrated in Figure 19.6.
According to Wikipedia: “in software engineering, the composite pattern is a partitioning design pattern. The composite pattern describes a group of objects that is treated the same way as a single instance of the same type of object. The intent of a composite is to “compose” objects into tree structures to represent part-whole hierarchies. Implementing the composite pattern lets clients treat individual objects and compositions uniformly.”
For example, using a tree data structure, composite pattern is illustrated as Figure 19.7. Another example of composite pattern will be adding a set of numbers, which is illustrated as Figure 19.7. Here the numbers are the leaves and composites are the addition operator.
Next, I discuss the concept of monoids in the context of composite pattern.
We have already seen the use of monoids in using reduction transformations (chapter 4). Here we will look at the monoids in the context of composite pattern. From the composite pattern definition, it should be obvious to see that there is a commonality between Monoids and composite patterns. The composite pattern is a design pattern, which is commonly used in programming languages such as Java, Python, and Scala and also used in big data for composing (such as addition and concatenation operators) and aggregating a set of data points.
To observe the commonality between Monoids and composite patterns, let’s take a look at the definition of “Monoids” (source: Wikipedia):
In abstract algebra, a branch of mathematics, a monoid is an algebraic structure with a single associative binary operation and an identity element. Monoids are studied in semigroup theory, because they are semigroups with identity. Monoids occur in several branches of mathematics; for instance, they can be regarded as categories with a single object. Thus, they capture the idea of function composition within a set. In fact, all functions from a set into itself form naturally a monoid with respect to function composition. Monoids are also commonly used in computer science, both in its foundational aspects and in practical programming.
An application of monoids in computer science
is so-called the “MapReduce” programming model.
The MapReduce programming model, consists of
three functions: map()
, combine()
, and
reduce()
. These three functions are very
similar to the map()
and flatMap()
functions
(combine()
is an optional operation) and
reduce()
transformations in Spark. Given a
dataset, map()
consists of mapping arbitrary
data to elements of a specific monoid, combine()
aggregates/folds data in local level (worker
nodes in cluster), and finally the reduce()
consists of aggregating/folding those elements,
so that in the end we produce just one element.
So in terms of a programming language semantics, a monoid is just an interface with one abstract value and one abstract method. The abstract method for a Monoid is the append operation (it can be an addition operator on integers and can be a concatenation operator on string objects). The “abstract value” for a Monoid is the identity value, for example, for adding a set of integer numbers, identity value is zero and for concatenating strings, identity value is an empty string (string of length zero). Note that, the identity value is defined as the value you can append to any value that will always result in the original value unmodified. For example, the identity value for collections data structures is the empty collection because appending a collection to an empty collection will typically produce the same collection unmodified.
In this paper: Monoidify!
Monoids as a Design Principle for Efficient MapReduce Algorithms,
Jimmy Lin clearly defines and relates monoids as a design principle
for efficient MapReduce algorithms. But what is a monoid, what
properties defines it, and how does it aid the MapReduce paradigm.
Following abstract algebra, “a monoid is an algebraic structure
with a single associative binary operation and an identity element”.
Jimmy Lin shows that when your MapReduce operations (map()
,
reduce()
, and reduceByKey()
transformations in Spark) are
not monoids, then it is very hard (and may be impossible) to
use “combiners” efficiently. Next we will briefly review
MapReduce’s combiners and abstract algebra’s monoids and see
how they are related to each other.
Also, David Saile
states that “recall that a monoid is an algebraic
structure with a single associative binary operation
and an identity element. For example, the natural
numbers N
(the term “natural numbers” refers either
to the set of positive integers {1, 2, 3, ...}
or
to the set of non-negative integers {0, 1, 2, 3, ...}
)
form a monoid under addition with identity element zero.
In classic MapReduce, the mapper is not constrained,
but the reducer is required to be (the iterated
application of) an associative operation. Recent
research argued that reduction is in fact monoidal
in known applications of MapReduce. That is, reduction
is indeed the iterated application of an associative
operation f()
with a unit u
. In the case of the
word-occurrence count example, reduction iterates
addition +
with 0
as unit. The parallel execution
schedule may be more flexible if commutativity
is required in addition to associativity.”
In MapReduce framework, the combiner (as an optional
plug-in component) is a “local-reduce” process which
operates only on data generated by one server. Successful
use of combiners reduces the amount of intermediate data
generated by the mappers on a given single server (that
is why it is so-called a local reducer). Combiners can
be used as a MapReduce optimization to reduce network
traffic (by decreasing size of the transient data as
(key, value)
pairs) between mappers and reducers.
Typically, combiners have the same interface as reducers.
The combiner must have the following characteristics:
Combiners receives as input all the data emitted by the mapper instances on a given server (this is called a local aggregation)
Combiners output is sent to the reducers — some programmers call this as a local server reduction!
The combiner must be side-effect free; combiners may run an indeterminate number of times.
The combiner must have the same input and output key types (see example below)
The combiner must have the same input and output value types (see example below)
The combiner runs in memory after the map phase
Therefore, a combiner skeleton should be defined as:
# key: as KeyType
# values: as Iterable<ValueType>
def
combine
(
key
,
values
):
...
# use key and values to create new_key and new_value
new_key
=
<
a
-
value
-
of
-
KeyType
>
new_value
=
<
a
-
value
-
of
-
ValueType
>
...
return
(
new_key
,
new_value
);
...
#end-def
This template indicates that (key, value)
pairs generated
by a combiner has to match the (key, value) pairs received
(as an input) by reducers. For example, if mapper outputs
(T~1~, T~2~)
pairs (key
is type T~1~
and value is type
T~2~
) then a combiner has to emit (T~1~
, T~2~
) pairs
as well.
The MapReduce/Hadoop does not have an explicit combine()
function, we just use reduce()
in Hadoop to implement
combiners, but we use a plugin of Job.setCombinerClass()
to define a combiner class.
Furthermore, Jimmy Lin concludes that “one principle for designing efficient MapReduce algorithms can be precisely articulated as follows: create a monoid out of the intermediate value emitted by the mapper. Once we “monoidify” the object, proper use of combiners and the in-mapper combining techniques becomes straightforward.” (source: Monoidify! Monoids as a Design Principle for Efficient MapReduce Algorithms)
Some programming languages have direct support for monoids. For example, the Haskell programming language has a direct support for monoids. In Haskell, “a monoid is a type with a rule for how two elements of that type can be combined to make another element of the same type.”
Monoid is a triplet (S, f, e), where
S is a set
S is called underlying set of monoid
f : S x S → S
f is a mapping called binary operation of monoid
e ∈ S
e is the identity operation of monoid
A monoid with binary operation +
(note that,
here, the binary operation is denoted by +
and
it does not mean a mathematical addition operator)
satisfies the following three axioms (note that
f(a,b) = a + b):
Closure:
for all a
, b
in S, the result of the operation
(a + b)
is also in S.
Associativity:
for all a
, b
and c
in S, the following equation holds:
((a + b) + c) = (a + (b + c))
Identity element:
there exists an element e
in S, such that for all
elements a
in S, the following two equations hold:
e + a = a a + e = a
And in mathematical notation we can write these as
Closure:
∀ a,b ∈ S: a + b ∈ S
Associativity:
for all a,b,c in S: ((a + b) + c) = (a + (b + c))
Identity element:
{ exists e in S: for all a in S: e + a = a a + e = a }
A monoid might have other properties: The monoid operator might (but isn’t required to) obey other properties like:
idempotency:
{ for all a in S: a + a = a }
commutativity:
{ for all a, b in S: a + b = b + a }
To form a monoid first we need a type S,
which can define a set of values such as
integers: {0, -1, +1, -2, +2, ...}
.
The second component is a binary function:
+ : S x S → S
Then we need to make sure that for any two
values x
in S and y
in S we get a result
object, the combination of x
and y
:
x + y : S
For example, let type S
be a set of integers,
then the binary operation may be addition (+
),
multiplication (*
) or division (/
). Finally,
as the third and most important ingredient we need
binary operation to follow a set of laws. If it
does, then S together with binary operation (+
)
is called a monoid. We say: (S, +, e)
is a monoid,
where e
in S is the identity element (such as 0
for addition and 1 for multiplication). Also note
that the binary division operator (‘/’) over a set
of real numbers is not a monoid:
((12 / 4) / 2) not equal (12 / (4 / 2)) ((12 / 4) / 2) = (3 / 2) = 1.5 (12 / (4 / 2)) = (12 / 2) = 6.0
In a nutshell, monoids capture the notion of
combining arbitrarily many things into a single
thing together with a notion of an empty thing
called the identity. One simple example is addition
on natural numbers {1, 2, 3, ...}
. The addition
function +
allows us to combine arbitrarily
many natural numbers into a single natural number,
the sum. The identity is the number zero. Another
example is string concatenation. The concatenation
operator allows us to combine arbitrarily many strings
into a single string. The identity value is the empty
concatenation, the empty string.
To understand the concept of monoids, below,
I have listed some monoid and non-monoid
examples. For example, to use Spark’s
reduceByKey()
effectively, you must make
sure that the reduction function is a monoid,
since Spark uses combiners on the reduceByKey()
transformation.
The set S = {0, 1, 2, ...}
is a commutative monoid
for the MAX (maximum) operation, whose identity element
is 0
(zero).
MAX(a, MAX(b, c)) = MAX(MAX(a, b), c)} MAX(a, 0) = a MAX(0, a) = a MAX(a, b) in S
Subtraction operator (-
) over a set of integers does
not define a monoid; this operation is not associative:
(1 - 2) -3 = -4 1 - (2 - 3) = 2
Addition operator (+
) over a set of integers
defines a monoid; this operation is commutative
and associative and the identity element is 0:
(1 + 2) + 3 = 6 1 + (2 + 3) = 6 n + 0 = n 0 + n = n
we can formalize this monoid as (below e(+)
defines an identity element):
S = {0, -1, +1, -2, +2, -3, +3, ...} e(+) = 0 (note: identity element is 0) f(a, b) = f(b, a) = a + b
The natural numbers, N = {0, 1, 2, 3, ...}
,
form a commutative monoid under multiplication
(identity element one).
On the other hand, the natural numbers, N = {0, 1, 2, 3, ...}
,
does not form a monoid under the mean (average) function. The
following example shows that the mean of means of arbitrary
subsets of a set of values is not the same as the mean of the
set of values:
MEAN(1, 2, 3, 4, 5) -- NOT EQUAL -- MEAN( MEAN(1,2,3), MEAN(4,5) ) MEAN(1, 2, 3, 4, 5) = (1+2+3+4+5)/5 = 15/5 = 3 MEAN( MEAN(1,2,3), MEAN(4,5) ) = MEAN(2, 4.5) = (2 + 4.5)/2 = 3.25
Therefore, if you want to find average of
values for a pair RDD (as RDD[(key, integer)]
),
then you may not use the following transformation
(which might yeild a wrong value due to aprtitioning):
# let rdd be a RDD[(key, integer)]
average_per_key
=
rdd
.
reduceByKey
(
lambda
x
,
y
:
(
x
+
y
)
/
2
)
The correct way to find average per key is to make it a monoid:
# let rdd be a RDD[(key, integer)]
# create value as (sum, count) pair: this makes a monoid
rdd2
=
rdd
.
mapValues
(
lambda
n
:
(
n
,
1
))
# find (sum, count) per key
sum_count
=
rdd2
.
reduceByKey
(
lambda
x
,
y
:
(
x
[
0
]
+
y
[
0
],
x
[
1
]
+
y
[
1
]))
# now, given (sum, count) per key, find the average per key
average_per_key
=
sum_count
.
mapValues
(
lambda
x
:
x
[
0
]
/
x
[
1
])
For a non-commutative example, consider the collection of all binary strings: an element is a finite ordered sequence of 0’s and 1’s. The binary operation is just concatenation: e.g. concat(1011, 001001) = 1011001001. The identity element is the “empty string” (String of size 0). Therefore concatenation of binary strings is a monoid.
The natural numbers does not form a monoid under the MEDIAN function:
MEDIAN(1, 2, 3, 5, 6, 7, 8, 9) -- NOT EQUAL -- MEDIAN( MEDIAN(1,2,3), MEDIAN(5,6,7,8,9) ) MEDIAN(1, 2, 3, 5, 6, 7, 8, 9) = (5 + 6) / 2 = 11 / 2 = 5.5 MEDIAN( MEDIAN(1,2,3), MEDIAN(5,6,7,8,9) ) = MEDIAN(2, 7) = = (2 + 7) / 2 = 9 / 2 = 4.5
A good example of a monoid is list of objects.
Lists with concatenation (+
) and the empty
list (as []
) are a monoid: for any list, we
can write:
Let L be a list L + [] = L [] + L = L
Also, note that the concatenation function is
associative. Given two lists, say [1,2,3]
and
[7,8]
, you can join them together using +
to
get [1,2,3,7,8]
. There’s also the empty list
[]
. Using +
to combine []
with any list
gives you back the same list, for example
[]+[1,2,3] = [1,2,3]
and [1,2,3]+[] = [1,2,3]
.
Sets under union or intersection over a set of integers forms a monoid.
Let N = {1, 2, 3, ...}
. Let m, n ∈ N.
Then the set of m x n matrices
with integer entries, written as Zmxn
satisfies properties that make it a monoid
under addition:
closure is guaranteed by the definition
the associative property is guaranteed by the associative property of its elements; and
the additive identity is 0
, the zero matrix
What have we learned from these examples and monoids
concepts? Spark’s reduceByKey()
is an efficient
transformation, which merges the values for each key
using an associative and commutative reduce function.
Therefore, we have to make sure that reduceByKey()'s
reduce finction is a monoid, otherwise you might not
get correct reduction results.
Given a large number of (key, value) pairs
where the keys are strings and the values are
integers, the goal is to find the average of all
the values by key. In SQL, this is accomplished
as (assuming that our table — named as mytable
— has key
and value
columns):
Select All Data
SELECT
key
,
value
FROM
mytable
key
value
--- -----
key1
10
key1
20
key1
30
key2
40
key2
60
key3
20
key3
30
Select All Data and GROUP BY key
SELECT
key
,
AVG
(
value
)
as
avg
FROM
mytable
GROUP
BY
key
key
avg
--- ---
key1
20
key2
50
key3
25
Here is the first version of MapReduce algorithm, where the mapper is not generating monoid outputs for the mean/average function.
Mapper function:
# key a string object # value an long associated with key map(key, value) { emit(key, value); }
Reducer function:
#key a string object # values as list of long data type numbers reduce(key, values) { sum = 0 count = 0 for (i : list) { sum = sum + i count++; } average = sum / count emit(key, average) }
Two observations from the first version of MapReduce algorithm:
the algorithm is not very efficient: since there will be too much work by “sort and shuffle” functions of MapReduce framework
We can not use the reducer as a combiner: since we know that the mean of means of arbitrary subsets of a set of values is not the same as the mean of the set of values.
Note that using combiners make Spark, MapReduce, and distributed algorithms efficient by reducing network traffic (you need to ensure that the combiner provides sufficient aggregation) and reducing the load of “sort and shuffle” functions of MapReduce framework. Now the question is how can we use our reducer to work as a combiner? The answer is to make output of the mapper to be a monoid: we change the output of a mapper. Once your mapper outputs monoids, then combiners and reducers will behave correctly.
Next, let’s look a monoid example.
In this section, I am revising the mapper to generate
(key, value)
pairs where key
is the string and
value
is a pair (sum, count)
, which has a monoid
property. The (sum, count)
data structure is a monoid
and the identity element is (0, 0)
. The proof is
given below:
Monoid type is (N, N) where N = {set of integers} Identity element is (0, 0): (sum, count) + (0, 0) = (sum, count) (0, 0) + (sum, count) = (sum, count) Let a = (sum1, count1), b = (sum2, count2), c = (sum3, count3) Then associativity holds: (a + (b + c)) = ((a + b) + c) + is the binary function: a + b = (sum1+sum2, count1+count2) in (N, N)
Now, let’s write a mapper (for MapReduce paradigm) for a Monoid data type
# key a string object # value a long data type associated with key # emits (key, (sum, count)) map(key, value) { emit (key, (value, 1)) }
As you can see, the key is the same as before,
but the value is a pair of (sum, count). Now,
the output of mapper is a monoid where the
identity element is (0, 0)
. The element-wise
sum operation can be performed as:
element1 = (key, (sum1, count1)) element2 = (key, (sum2, count2)) ==> values for the same key are reduced as: element1 + element2 = (sum1, count1) + (sum2, count2) = (sum1+sum2, count1+count2)
Now the mean function will be calculated correctly
since mappers output monoids: imagine values for a
single key are {1, 2, 3, 4, 5}
and {1, 2, 3}
goes
to partition-1 and {4, 5}
goes to partitin-2:
MEAN(1, 2, 3, 4, 5) = MEAN( MEAN(1,2,3), MEAN(4,5) )} = (1+2+3+4+5) / 5 = 15 / 5 = 3 Partition-1: MEAN(1,2,3) = MEAN(6, 3) Partition-2: MEAN(4,5) = MEAN(9, 2) Merging partitions: MEAN( MEAN(1,2,3), MEAN(4,5) ) = MEAN( MEAN(6, 3), MEAN(9, 2)) = MEAN(15, 5) = 15 / 5 = 3
The revised algorithm, where mappers (defined above)
outputs are monoids is presented below (for a given pair
(sum, count), pair.1
refers to sum and pair.2
refers
to count):
# key a string object # values is a list of pairs as: [(s1, c1), (s2, c2), ...] combine(key, values) { sum = 0 count = 0 for (pair : values) { sum += pair.1 count += pair.2 } emit (key, (sum, count)) }
The reduce()
transformation is presented below:
# key a string object # value is a list of pair as [(s1, c1), (s2, c2), ...] reduce(key, values) { sum = 0 count = 0 for (pair : values) { sum += pair.1 count += pair.2 } average = sum / count emit (key, average) }
Therefore, in order to to qurantee that your combiners are executing correctly, you must make sure that your mappers are generating monoidic data types.
The goal of this section is to provide Monodized Mean
solution: it means that combiners can be used to assist
in aggregating values. To compute mean of numbers for
the same key, we can group all values (using the Spark’s
groupByKey()
transformation) per key and then find the
sum and divide by the count of numbers. This is not an
optimal solution due to the fact the groupByKey()
might
create OOM problems.
For Monodized Mean solution, for a given (key, number)
we will emit (key, (number, 1))
. The associated value
for a key denotes a pair of (sum, frequency). Earlier,
I demonstrated that using (sum, frequency) for values
enable us to use combiners and reducers for proper
calculation of the mean function. With this set up, we
can use reduceByKey()
transformation, which is very
efficient reducer. How do we reduce? Given the following
two pairs for the same key (in earlier section, I proved
that (sum, count)
is a monoid data type for finding
average of values — therefore using reduceByKey()
will work correctly):
(key, value1) = (key, (sum1, count1)) (key, value2) = (key, (sum2, count2))
This is how reduction function will work:
value1 + value2 = (sum1, count1) + (sum2, count2) = (sum1+sum2, count1+count2)
Once the reduction (by reduceByKey()
) is done,
then we need an additional mapper to find the
average by dividing the sum by the count (which
will be shown in the following subsections).
The input record format will be
Input Record Format: <key-as-string><,><value-as-integer> Examples: key1,100 key2,46 key1,300
The high-level steps are presented by the following four steps:
Step-1: Read input and create the first RDD as RDD[String]
Step-2: Apply map()
to create RDD[key, (number, 1)]
Step-3: Perform reduction by reduceByKey()
, which will
create RDD[key, (sum, count)]
Step-4: Apply mapValue()
to create average RDD
as RDD[key, (sum / count)]
The complete PySpark program (average_monoid_driver.py
)
is listed below.
First, we need two simple Python functions to help us in using Spark transformations.
First function: create_pair
to accept a
String object as “key,number” and returns
(key, (number, 1))
pair.
# record as String of "key,number"
def
create_pair
(
record
):
tokens
=
record
.
split
(
","
)
key
=
tokens
[
0
]
number
=
int
(
tokens
[
1
])
return
(
key
,
(
number
,
1
))
# end-def
Second function: add_pairs
accept two
tuples of (sum1, count1)
and (sum2, count2)
and returns sum of tuples (sum1+sum2, count1+count2)
.
# a = (sum1, count1)
# b = (sum2, count2)
def
add_pairs
(
a
,
b
):
# sum = sum1+sum2
sum
=
a
[
0
]
+
b
[
0
]
# count = count1+count2
count
=
a
[
1
]
+
b
[
1
]
return
(
sum
,
count
)
# end-def
Here is the complete PySpark solution:
from
__future__
import
print_function
import
sys
from
pyspark.sql
import
SparkSession
if
len
(
sys
.
argv
)
!=
2
:
(
"
Usage: average_monoid_driver.py <file>
"
,
file
=
sys
.
stderr
)
exit
(
-
1
)
spark
=
SparkSession
.
builder
.
getOrCreate
(
)
# sys.argv[0] is the name of the script.
# sys.argv[1] is the first parameter
input_path
=
sys
.
argv
[
1
]
(
"
input_path: {}
"
.
format
(
input_path
)
)
# read input and create an RDD<String>
records
=
spark
.
sparkContext
.
textFile
(
input_path
)
# create a pair of (key, (number, 1)) for "key,number"
key_number_one
=
records
.
map
(
create_pair
)
# aggregate the (sum, count) of each unique key
sum_count
=
key_number_one
.
reduceByKey
(
add_pairs
)
# create the final RDD as RDD[key, average]
averages
=
sum_count
.
mapValues
(
lambda
(
sum
,
count
)
:
sum
/
count
)
(
"
averages.take(5):
"
,
averages
.
take
(
5
)
)
# done!
spark
.
stop
(
)
Import the print() function
Import System-specific parameters and functions
Import SparkSession from the pyspark.sql
module
Make sure that we have 2 parameters in the command line
Create an instance of a SparkSession object by using
the builder pattern SparkSession.builder
class
Define input path (this can be a file or a directory containing any number of files
Read input and create the first RDD as RDD[String]
where each object has this format: “key,number”
Create key_number_one RDD as (key, (number, 1))
Aggregate (sum1, count1)
with (sum2, count2)
and create (sum1+sum2, count1+count2)
as values
Apply the mapValues()
transformation to find
final average per key
We observed that in MapReduce paradigm (which
is a foundation of Hadoop, Spark, Tez, …),
if your mapper generates monoids then you can
utilize combiners for optimization and efficiency
purposes (using combiners reduces network traffic
and make MapReduce’s “sort and shuffle” functions
more efficient by processing less data). Also, we
showed that how to monodify MapReduce algorithms.
The challenge to us is to monodify our MapReduce
algorithms. In general, combiners can be used
when the function you want to apply is both
commutative and associative (properties of a
monoid). For example, classic word count is
a monoid over a set of integers with the +
operation (here you can use a combiner).
But the mean function (which is not associative
as shown in the counter example above) over
a set of integers does not form a monoid.
Therefore, if combiners used properly, then
it will significantly cuts down the amount
of data shuffled from the maps to the reducers.
Now that we have seen the definition and use of monoids in MapReduce framework, we can even apply higher-order functions (like “functors”) to monoids. A functor is an object that is a function (or we can say that it is a function and object at the same time). The Java JDK8 has a direct support for functors and it is named lambdas (for details see Lambda project). Programming languages JDK6 and JDK7 do not have direct concept of functors, because functions are not first-class objects in Java; it means that you can not pass a function name as an argument to another function. There is a simple way to simulate functors in Java by defining an interface and a method (this is a very simplistic simulation):
public interface FunctorSimulation<T1, T2> { T2 apply(T1 input); }
First, we present a “functor” on a monoid by a
simple example: Let MONOID = (t, e, f)
be a
monoid, where T is a type (set of values),
e
is the identity element, and f
is the “+”
binary plus function:
MONOID = { type T val e : T val plus : T x T -> T }
Then we define a functor Prod
as
functor Prod (M : MONOID) (N : MONOID) = { type t = M.T * N.T val e = (M.e, N.e) fun plus((x1,y1), (x2,y2)) = (M.plus(x1,x2), N.plus(y1,y2)) }
Then we can define other functors such as Square as:
functor Square (M : MONOID) : MONOID = Prod M M
Also, a functor between two monoids can be defined as: Let (M1, f1, e1) and (M2, f2, e2) be monids. A functor:
F : (M1, f1, e1) → (M2, f2, e2)
is specified by a an object map (monoids are categories with a single object) and an arrow map: F : M1 → M2 and the following conditions will hold:
∀a,b ∈ M1, F(f1(a,b)) = f2(F(a), F(b))
F(e1) = e2
A functor between two monoids is just a “monoid
homomorphism.” For example, for String data type,
function Length()
that counts the number of letters
in a word is a monoid homomorphism.
Length("") = 0
(length of an empty string is 0)
If Length(x) = m
and Length(y) = n
, then concatenation
x + y
of strings has m + n
letters, for example:
Length("String" + "ology") = Length("Stringology") = 11 = 6 + 5 = Length("String") + Length("ology")
Therefore creating monoids by mappers is a guarantee that the recucers can take advantage of using combiners effectivey and correctly.
Next, we take a look a map-side join design pattern, which can reduce the join performance time between two large data sets.
A join operation is used to combine rows from two (or more) tables, based on a common column (or columns) between them. Join is an expensive operation and since big data engines (such as Hadoop and Spark) do not support indexing of data tables, therefore, join can be an expensive. Here, I introduce a map-side join, which can reduce the cost of join between two tables. Map-side join is a process where two data sets are joined by the mapper rather than using the actual join function (which can happen by combination of a mapper and reducer). What are the advantages of using map-side join? The advantages of using map side join are as follows:
Map-side join might help in minimizing the cost that is incurred for sorting and merging in the shuffle and reduce stages.
Map-side join might help in improving the performance of the task by decreasing the time to finish the task.
Using SQL terminology, a JOIN
clause is
used to combine rows from two (or more)
tables, based on a related column between
Typically, a SQL JOIN
is an expensive
operation (unless you have built proper
indexes). In general, joins on large sets
are expensive, but very rarely do you want
to join the entire contents of large table
A
with the entire contents of large
table B
.
To understand map side join, assume that we have two tables (using MySQL database): EMP and DEPT as defined below. Let’s see how a join works between these two tables. Consider the following two tables EMP and DEP:
mysql
>
use
testdb
;
Database
changed
mysql
>
select
*
from
emp
;
+
--------+----------+---------+
|
emp_id
|
emp_name
|
dept_id
|
+
--------+----------+---------+
|
1000
|
alex
|
10
|
|
2000
|
ted
|
10
|
|
3000
|
mat
|
20
|
|
4000
|
max
|
20
|
|
5000
|
joe
|
10
|
+
--------+----------+---------+
5
rows
in
set
(
0
.
00
sec
)
mysql
>
select
*
from
dept
;
+
---------+------------+---------------+
|
dept_id
|
dept_name
|
dept_location
|
+
---------+------------+---------------+
|
10
|
ACCOUNTING
|
NEW
YORK
,
NY
|
|
20
|
RESEARCH
|
DALLAS
,
TX
|
|
30
|
SALES
|
CHICAGO
,
IL
|
|
40
|
OPERATIONS
|
BOSTON
,
MA
|
|
50
|
MARKETING
|
Sunnyvale
,
CA
|
|
60
|
SOFTWARE
|
Stanford
,
CA
|
+
---------+------------+---------------+
6
rows
in
set
(
0
.
00
sec
)
Next we join (as INNER JOIN) these tow
tables on the dept_id
key:
mysql
>
select
e
.
emp_id
,
e
.
emp_name
,
e
.
dept_id
,
d
.
dept_name
,
d
.
dept_location
from
emp
e
,
dept
d
where
e
.
dept_id
=
d
.
dept_id
;
+
--------+----------+---------+------------+---------------+
|
emp_id
|
emp_name
|
dept_id
|
dept_name
|
dept_location
|
+
--------+----------+---------+------------+---------------+
|
1000
|
alex
|
10
|
ACCOUNTING
|
NEW
YORK
,
NY
|
|
2000
|
ted
|
10
|
ACCOUNTING
|
NEW
YORK
,
NY
|
|
5000
|
joe
|
10
|
ACCOUNTING
|
NEW
YORK
,
NY
|
|
3000
|
mat
|
20
|
RESEARCH
|
DALLAS
,
TX
|
|
4000
|
max
|
20
|
RESEARCH
|
DALLAS
,
TX
|
+
--------+----------+---------+------------+---------------+
5
rows
in
set
(
0
.
00
sec
)
Map-side Join is similar to a join but all the task will be performed by the mapper alone (note that the result of an INNER JOIN and map side join must be identical).
Given two tables A
and B
, the map-side join will
be mostly suitable when table A
is large and table
B
is a small to meduim. Since table B
is not
very big, then we create a hash table from B
and
then broadcast it to all nodes. Next, we iterate
all elements of table A
by a mapper and then
accessing a broadcasted hash table (which denotes
table B
).
Create two RDDs:
First, we create EMP
as
RDD[(dept_id, (emp_id, emp_name))]
:
EMP
=
spark
.
sparkContext
.
parallelize
(
[
(
10
,
(
1000
,
'alex'
)),
(
10
,
(
2000
,
'ted'
)),
(
20
,
(
3000
,
'mat'
)),
(
20
,
(
4000
,
'max'
)),
(
10
,
(
5000
,
'joe'
))
])
Next, we create DEPT
as
RDD[(dept_id, (dept_name , dept_location))]
:
DEPT
=
spark
.
sparkContext
.
parallelize
(
[
(
10
,
(
'ACCOUNTING'
,
'NEW YORK, NY'
)),
(
20
,
(
'RESEARCH'
,
'DALLAS, TX'
)),
(
30
,
(
'SALES'
,
'CHICAGO, IL'
)),
(
40
,
(
'OPERATIONS'
,
'BOSTON, MA'
)),
(
50
,
(
'MARKETING'
,
'Sunnyvale, CA'
)),
(
60
,
(
'SOFTWARE'
,
'Stanford, CA'
))
])
Now that both EMP
and DEPT
have a common
key on the dept_id
, then we join them:
>>>
sorted
(
EMP
.
join
(
DEPT
)
.
collect
())
[
(
10
,
((
1000
,
'alex'
),
(
'ACCOUNTING'
,
'NEW YORK, NY'
))),
(
10
,
((
2000
,
'ted'
),
(
'ACCOUNTING'
,
'NEW YORK, NY'
))),
(
10
,
((
5000
,
'joe'
),
(
'ACCOUNTING'
,
'NEW YORK, NY'
))),
(
20
,
((
3000
,
'mat'
),
(
'RESEARCH'
,
'DALLAS, TX'
))),
(
20
,
((
4000
,
'max'
),
(
'RESEARCH'
,
'DALLAS, TX'
)))
]
How will the map-side join optimize the task?
Here, I assume that EMM is a large data set and
DEPT is a relatively small dat set. Using map
side join, to join EMP with DEPT on dept_id
,
we will create a brodcast varibale from a small
table (this is a custom hash builder from an RDD):
# build a dictionary of (key, value),
# where key = dept_id
# value = (dept_name , dept_location)
#
def
to_hash_table
(
dept_as_list
):
hast_table
=
{}
for
d
in
dept_as_list
:
dept_id
=
d
[
0
]
dept_name_location
=
d
[
1
]
hash_table
[
dept_id
]
=
dept_name_location
return
hash_table
#end-def
dept_hash_table
=
to_hash_table
(
DEPT
.
collect
())
Alternatively, you may build the same hash
table by using an Spark action collectAsMap()
,
which returns the key-value pairs in this RDD
(DEPT) to the master as a dictionary. Therefore,
we can write:
dept_hash_table
=
DEPT
.
collectAsMap
()
Now, using pyspark.SparkContext.broadcast
,
you can broadcast a read-only variable
(dept_hash_table
) to the Spark cluster,
which will be available to all kinds of
transformations (including mappers and
reducers).
sc
=
spark
.
sparkContext
hash_table_broadcasted
=
sc
.
broadcast
(
dept_hash_table
)
Now, how do we perfor the map side join? In your mapper, you can access your dept_hash_table via:
dept_hash_table
=
hash_table_broadcasted
.
value
Next, I show the map side join by using a map() transformation:
joined
=
EMP
.
map
(
map_side_join
)
Finally, map_side_join()
is defined as:
# e as an element of EMP RDD
def
map_side_join
(
e
):
dept_id
=
e
[
0
]
# get hash_table from broadcasted object
hash_table
=
hash_table_broadcasted
.
value
dept_name_location
=
hash_table
[
dept_id
]
return
(
e
,
dept_name_location
)
#end-def
Therefore, with map side join, you can avoid shuffling which can cost a lot and you avoid significant network I/O.
With a map-side join, we just used a map()
function for each row of the EMP
table,
and retrieved dimension values (such as
dept_name
and dept_location
) from
the broadcasted hash table. If EMP
table is very large, the map()
function
will be executed concurrently for each
partition that has own copy of hash table.
The map side join approach allows us not
to shuffle the fact table (i.e., DEPT
)
and to get quite good join performance.
Below, I have listed advantages and disadvantages of using a map-side join.
The true join cost can be reduced since we are minimizing the cost that is incurred for sorting and merging in the shuffle and reduce stages.
Map-side join improves the performance of the join task by decreasing the time to finish the join operation.
Map-side join data pattern is proper when one of the RDDs/tables on which you perform map-side join operation is small enough to fit into the memory.
If both RDDs/tables are not small at all, then map-side join is not suitable to perform map-side join on the RDDs/tables.
Next, I discuss another design pattern (Join using Bloom filters), which can be used in efficient joining of two tables.
Given two RDDs as A=RDD[(K, V)]
(as a bigger
RDD) and B=RDD[(K, W)]
(as a smaller RDD),
Spark enable us to perform join operation
on key K
. Joining two RDDs is a common
operation when working with Spark. In some
of the cases, a join is used as a form of
filtering. For example, you want to perform
an operation on a subset of the records in
the RDD[(K, V)]
, represented by entities
in another RDD[(K, W)]
. For this you can
use an inner join to achieve that effect.
But, you want to avoid the shuffle that
the join operation introduces, especially
if the RDD[(K, W)]
you want to use for
filtering is significantly smaller than
the main RDD[(K, V)]
on which you will
perform your further computation.
You may do a broadcast join using a set
(as a Bloom filter) constructed by collecting
the smaller RDD you wish to filter by. But,
this means collecting the whole smaller
RDD[(K, W)]
in driver memory, and even
if it is relatively small (several thousand
or million records), that can still lead
to some undesirable memory pressure. If you
want to avoid the shuffle that the join
operation introduces, then you may use the Bloom filter.
So the problem of joining RDD[(K, V)]
with RDD[(K, W)]
is reduced into a simple
map()
transformation, which we will check
the key K
against the Bloom filter
constructed from the smaller RDD[(K, W)]
.
A Bloom filter is a space-efficient probabilistic data structure, conceived by B. H. Bloom in 1970, that is used to test whether an element is a member of a set. False positive matches are possible, but false negatives are not; i.e. a query returns either possibly in set or definitely not in set. Elements can be added to the set, but not removed (though this can be addressed with a “counting” filter). The more elements that are added to the set, the larger the probability of false positives. The Bloom filter data structure may return true for elements that are not actually members of the set (this is called false-positives errors), but it will never return false for elements that are in the set; for each element in the set, the Bloom filter must return true.
Therefore, in a nutshell, we can summarize Bloom filter properties as:
Given a big set S = { x~1~, x~2~, ..., x~n~}
,
Bloom filter is a probabilistic, fast, and
space efficient cache builder for a big data
set; this is basically approximating set
membership operation:
is x in S
It tries to answer lookup questions:
does item x exist in a set S?
It allows false positive errors as they
only cost us an extra data set access.
This means that for some x
, which is
not in the set, Bloom filter might indicate
that x
is in the set.
It does not allow false negative errors,
because they result in wrong answers. This
means that if x
is not in the set, then
for sure it will indicate that x
is not
in the set. Thus, the Bloom filter does
not allow false negatives, but can allow
false positives.
Let’s focus on a simple join example between
two relations or tables: let’s say that we want
to join R=RDD(K, V)
and S=RDD(K, W)
on a common
key K
. Further assume that
count(R) = 1000,000,000 (larger data set) count(S) = 10,000,000 (smaller data set)
To do basic join, we need to check 10 trillion
(10^12
) records, which is a huge and time
consuming process. One idea to reduce time and
complexity of join operation between R
and S
is to use a Bloom filter on relation S
(smaller
data set) and then use the built Bloom filter
data structure on relation R
. This can eliminate
the non-needed records from R
(it might reduce
it to 20,000,000
) and hence the join becomes
fast and efficient.
Next we semi-formalize Bloom filter data structure: what is involved in Bloom filter probabilistic data structures? How do we construct one? what is the probability of false positive errors, and how we can decrease the probability of false positive errors. This is how it works:
Given a set S
= { x~1~, x~2~, ..., x~n~}
Let B
be an m
( m > 1
) bit array,
initialized with 0s. B’s elements are B[0]
,
B[1]
, B[2]
, …, B[m-1]
.
The memory required for array B
is only a
fraction of the one needed for storing the
whole set S
. By selecting the bigger
bit vector (array B
), the probability of
false positive rate will decrease.
Let { H~1~, H~2~, ..., H~k~}
be a set of k
hash functions, If H~i~(x~j~) = a
then
set B[a] = 1
. You may use SHA1, MD5, and
Murmer as hash functions. For example, you
may use the following as hash functions:
H~i~(x) = MD5(x+i)
H~i~(x) = MD5(x || i)
To check if x in S, check B
at H~i~(x)
. All k
values must be 1
.
It is possible to have a false positive;
all k
values are 1
, but x
is not in S
.
The probability of false positives is (for details, see Wikipedia).
Big(1 - ig[1 - {1 over m}ig]^{kn}Big)^k approx Big(1 - e^{-kn/m}Big)^k
What are the optimal hash functions? What is
the optimal number of hash functions? For a
given m
(number of bits selected for Bloom
filter) and n
(size of big data set),
the value of k
(the number of hash functions)
that minimizes the probability of false
positives is (ln
stands for “natural
logarithm”)
k = { m over n } ln(2)
m = - {n : ln(p) over {(ln(2))^2}}
Therefore, the probability that a specific bit has been flipped to 1 is:
1 - ig(1 - {1 over m} ig)^{kn} approx 1 - e^{-{{kn} over m}}
Next, let’s take a look at a Bloom filter example.
This example shows how to insert and query
on a bloom filter of size 10 (m = 10
) with
three hash functions H = {H~1~, H~2~, H~3~}
and let H(x)
denote the result of these
three hash functions. We start with a 10-bit
long array B
initilized to 0
:
Array B: initialized: index 0 1 2 3 4 5 6 7 8 9 value 0 0 0 0 0 0 0 0 0 0 insert element a, H(a) = (2, 5, 6) index 0 1 2 3 4 5 6 7 8 9 value 0 0 1 0 0 1 1 0 0 0 insert element b, H(b) = (1, 5, 8) index 0 1 2 3 4 5 6 7 8 9 value 0 1 1 0 0 1 1 0 1 0 query element c H(c) = (5, 8, 9) => c is not a member (since B[9]=0) query element d H(d) = (2, 5, 8) => d is a member (False Positive) query element e H(e) = (1, 2, 6) => e is a member (False Positive) query element f H(f) = (2, 5, 6) => f is a member (Positive)
The following code segment shows how to create and use a Bloom filter in Python (you may roll your own Bloom filter library, but if you find a proper library, then use it).
# instantiate BloomFilter with custom settings,
>>>
from
bloom_filter
import
BloomFilter
>>>
bloom
=
BloomFilter
(
max_elements
=
100000
,
error_rate
=
0.01
)
# Test whether the bloom-filter has seen a key:
>>>
"test-key"
in
bloom
False
# Mark the key as seen
>>>
bloom
.
add
(
"test-key"
)
# Now check again
>>>
"test-key"
in
bloom
True
Bloom filter is a small, compact, and fast
data structure for set membership. It can
be used in the join of two RDDs/relations/tables
such as R(K, V)
and S(K, W)
where one
of the relations has huge number of records
(for example, R
to have 1000,000,000
records)
and the other relation has small number of
records (for example, S
to have 10,000,000
records). To do a join on key field “K”
between R
and S
, it will take a long time
and it is inefficient.
We can use Bloom filter data structure
in the following way: build a Bloom filter
out of relation S(K, W)
, and then test
values R(K, V)
for membership using the
built Bloom filter (using Sparks broadcast
mechanism). Note that, for reduce-side
join optimization, we use a Bloom filter
on the map tasks, which will force an I/O
cost reduction for the PySpark job. How do
we use the Bloom filter concept in mappers?
The following steps show how to use a Bloom
filter (representation of S
) in mappers
(R
), which will be substitute for the join
operation between R
and S
.
Construction of the bloom filter; this a
samll PySpark code segment, which uses
the smaller of the two relations/tables
for constructing the bloom filter data
structure. First, initialize the Bloom
filter (create an instance of BloomFilter
),
then you may use BloomFilter.add()
to build the Bloom filter. Let’s call the
built Bloom filter as the_bloom_filter
Use SparkContext.broadcast()
to broadcast
the built Bloom filter (the_bloom_filter
)
to all worker nodes (so that it can be
available to all Spark transformations
including mappers)
# to broadcast it to all worker nodes for read only purpses
sc
=
spark
.
sparkContext
broadcasted_bloom_filter
=
sc
.
broadcast
(
the_bloom_filter
)
`
Now, we can user the Bloom filter to get rid of the non-needed elements.
# e is an element of R(k, b)
def
bloom_filter_function
(
e
):
# get a copy of Bloom filter
the_bloom_filter
=
broadcasted_bloom_filter
.
value
()
# use the_bloom_filter for element e
key
=
e
[
0
]
if
key
in
the_bloom_filter
:
return
True
else
:
return
False
#end-def
We use the filter_function()
for R=RDD[(K, V)]
to keep the elements if and only if the key is in
S=RDD[(K, W)]
.
# R=RDD[(K, V)]
# joined = RDD[(K, V)] where K is in S=RDD[(K, W)]
joined
=
R
.
filter
(
bloom_filter_function
)
MapReduce design patterns are common patterns in data-related problem solving. These design patterns enable us to solve similar data problems in an efficient manner
Some of MapReduce design patterns are:
Summarization patterns: get a top-level view by summarizing and grouping data
Filtering patterns: view data subsets by using predicates
Data organization patterns: reorganize data to work with other systems, or to make MapReduce analysis easier
Join patterns: analyze different datasets together to discover interesting relationships
Meta patterns: piece together several patterns to solve multi-stage problems, or to perform several analytics in the same job
Input and output patterns: customize the way you use persistent store (such as HDFS and S3) to load or store data
Some of Summarization Patterns are listed:
Word Count
MinMax Count
Summarization
Map-side Join
Join with Bloom filter
18.118.184.237