©  Raju Kumar Mishra 2018
Raju Kumar MishraPySpark Recipeshttps://doi.org/10.1007/978-1-4842-3141-8_5

5. The Power of Pairs: Paired RDDs

Raju Kumar Mishra
(1)
Bangalore, Karnataka, India
 
Key/value pairs are good for solving many problems efficiently in a parallel fashion. Apache Mahout, a machine-learning library that was initially developed on top of Apache Hadoop, implements many machine-learning algorithms in the areas of classification, clustering, and collaborative filtering by using the MapReduce key/value-pair architecture . In this chapter, you’ll work through recipes that develop skills for solving interesting big data problems from many disciplines.
This chapter covers the following recipes:
  • Recipe 5-1. Create a paired RDD
  • Recipe 5-2. Perform data aggregation
  • Recipe 5-3. Join data
  • Recipe 5-4. Calculate page rank

Recipe 5-1. Create a Paired RDD

Problem

You want to create a paired RDD.

Solution

You have an RDD, RDD1. The elements of RDD1 are b, d, m, t, e, and u. You want to create a paired RDD, in which the keys are elements of a single RDD, and the value of a key is 0 if the element is a consonant, or1 if the element is a vowel. Figure 5-1 clearly depicts the requirements.
A430628_1_En_5_Fig1_HTML.gif
Figure 5-1.
Creating a paired RDD
A paired RDD can be created in many ways. One way is to read data directly from files. We’ll explore this method in an upcoming chapter. Another way to create a paired RDD is by using the map() method, which you’ll learn about in this recipe.

How It Works

In this section, you’ll follow several steps to reach the solution.

Step 5-1-1. Creating an RDD with Single Elements

Let’s start by creating an RDD out of our given data:
>>> pythonList  =  ['b', 'd', 'm', 't', 'e', 'u']
>>> RDD1 = sc.parallelize(pythonList,2)
>>>RDD1.collect()
Here is the output:
['b', 'd', 'm', 't', 'e', 'u']
We have created an RDD named RDD1. The elements of RDD1 are b, d, m, t, e, and u. This is an RDD of letters. It can be observed that the elements b, d, m, and t are consonants. The other elements of RDD1, e and u, are vowels .

Step 5-1-2. Writing a Python Method to Check for Consonants

We are going to define a Python function named vowelCheckFunction(). This function will take a letter as input and return 1 if the input is a consonant, or 0 if it is not. Let’s implement the function:
>>> def vowelCheckFunction( data) :
...     if data in ['a','e','i','o','u']:
...        return 1
...     else :
...        return 0
It can be observed that our Python function vowelCheckFunction will take one input data. In this case, we are going to send a letter. Inside the function, we check whether our data is a vowel. If our if block logical expression results in True, our function will return 1; otherwise, it will return 0.
Without testing vowelCheckFunction, we shouldn’t trust it. So let’s start with a vowel as input:
>>> vowelCheckFunction('a')
Here is the output:
1
We get 1 as the output for our vowel input. Our Python function vowelCheckFunction meets our expectation for vowels. That’s nice, but let’s test for consonants too. This time we are going to send b, a consonant, as the input:
>>> vowelCheckFunction('b')
Here is the output:
0
We have tested our function. It is working for both consonants and vowels with the anticipated output. We can bank on our developed function .

Step 5-1-3. Creating a Paired RDD

We can create our required RDD by using the map() function. We have to create a paired RDD: the keys will be the elements of RDD1, and the value will be 0 for keys that are consonants, or 1 for keys that are vowels:
>>> RDD2 = RDD1.map( lambda data : (data, vowelCheckFunction(data)))
>>>RDD2.collect()
Here is the output:
[('b', 0),
 ('d', 0),
 ('m', 0),
 ('t', 0),
 ('e', 1),
 ('u', 1)]

Step 5-1-4. Fetching Keys from a Paired RDD

The keys() function can be used to fetch all the keys:
>>> RDD2Keys = RDD2.keys()
The following code line gets all the keys:
>>> RDD2Keys.collect()
Here is the output:
['b', 'd', 'm', 't', 'e', 'u']
We can see that the keys() function performs a transformation. Therefore, keys() returns an RDD that requires the collect() function to get the data to the driver.

Step 5-1-5. Fetching Values from a Paired RDD

Similar to the keys() function, the values() function will fetch all the values from a paired RDD. It also performs a transformation:
>>> RDD2Values = RDD2.values()
>>> RDD2Values.collect()
Here is the output:
[0, 0, 0, 0, 1, 1]

Recipe 5-2. Aggregate Data

Problem

You want to aggregate data.

Solution

You want to perform data aggregation on data from a lightbulb manufacturer, as shown in Table 5-1.
A430628_1_En_5_Figa_HTML.gif
Table 5-1.
Filament Data
Company YP manufactures two types of filaments : filamentA and filamentB. 100W and 200W electric bulbs can be manufactured from both filaments. Table 5-1 indicates the expected life of each bulb.
You want to calculate the following:
  • Mean life in hours for bulbs of each filament type
  • Mean life in hours for bulbs of each power level
  • Mean life in hours based on both filament type and power
We generally encounter aggregation of data in data-science problems. To get an aggregation of data, we can use many PySpark functions.
In this recipe, we’ll use the reduceByKey() function to calculate the mean by using keys. Calculating the mean of complex keys requires creating those complex keys. Complex keys can be created by using the map() function.

How It Works

Let’s start with the creation of the RDD.

Step 5-2-1. Creating an RDD with Single Elements

>>> filDataSingle = [['filamentA','100W',605],
...                  ['filamentB','100W',683],
...                  ['filamentB','100W',691],
...                  ['filamentB','200W',561],
...                  ['filamentA','200W',530],
...                  ['filamentA','100W',619],
...                  ['filamentB','100W',686],
...                  ['filamentB','200W',600],
...                  ['filamentB','100W',696],
...                  ['filamentA','200W',579],
...                  ['filamentA','200W',520],
...                  ['filamentA','100W',622],
...                  ['filamentA','100W',668],
...                  ['filamentB','200W',569],
...                  ['filamentB','200W',555],
...                  ['filamentA','200W',541]]
>>> filDataSingleRDD = sc.parallelize(filDataSingle,2)
>>> filDataSingleRDD.take(3)
Here is the output:
[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691]]
Here we are first creating a nested Python list of filament data named filDataSingle. Then we create the RDD filDataSingleRDD. We divide our data into two parts.  The output of the take() function on filDataSingleRDD clearly shows that the elements of the RDD are a list.

Step 5-2-2. Creating a Paired RDD

First we have to calculate the mean lifetime of bulbs, based on their filament type. Better that we are creating a paired RDD with keys for the filament type and values for the life in hours. So let’s create our required paired RDD and then investigate it:
>>> filDataPairedRDD1 = filDataSingleRDD.map(lambda x : (x[0], x[2]))
>>> filDataPairedRDD1.take(4)
Here is the output:
[('filamentA', 605),
 ('filamentB', 683),
 ('filamentB', 691),
 ('filamentB', 561)]
We have created a paired RDD, filDataPairedRDD1, by using the map() function defined on the RDD. The paired RDD filDataPairedRDD1 has the filament type as the key, and the life in hours as the value.

Step 5-2-3. Finding the Mean Lifetime Based on Filament Type

Now we have our required paired RDD. But is this all we need? No. To calculate the mean, we need a sum and a count. We have to add an extra 1 in our paired RDD so that we can get a sum and a count. So let’s add an extra 1 now to each RDD element:
>>> filDataPairedRDD11 = filDataPairedRDD1.map(lambda x : (x[0], [x[1], 1]))
>>> filDataPairedRDD11.take(4)
Here is the output:
[('filamentA', [605, 1]),
 ('filamentB', [683, 1]),
 ('filamentB', [691, 1]),
 ('filamentB', [561, 1])]
filDataPairedRDD11 is a paired RDD. The values of filDataPairedRDD11 are presented as a list; the first element is the lifetime of the bulb (in hours), and the second element is just 1.
Now we have to calculate the sum of the values of the lifetimes for each filament type as well as the count value, so that we can calculate the mean. Many PySpark functions could be used to do this job, but here we are going to use the reduceByKey() function for paired RDDs.
The reduceByKey() function applies aggregation operators key wise. It takes an aggregation function as input and applies that function on the values of each RDD key.
Let’s calculate the sum of the total life hours of bulbs based on the filament type, and the count of elements for each filament type :
>>> filDataSumandCount = filDataPairedRDD11.reduceByKey(lambda l1,l2 : [l1[0] + l2[0], l1[1]+l2[1]])
>>> filDataSumandCount.collect()
Here is the output:
[('filamentA', [4684, 8]),
 ('filamentB', [5041, 8])]
We are applying the reduceBykey() function to our paired RDD, filDataPairedRDD11. To understand the workings of reduceByKey(), let’s see how our paired RDD has been distributed. Let’s start with the number of elements in the RDD: the five elements of filDataPairedRDD11:
>>> filDataPairedRDD11.count()
Here is the output:
16
Our paired RDD has 16 elements. Now let’s see how many partitions our data has:
>>> filDataPairedRDD11.getNumPartitions()
Here is the output:
2
It is clear that the data has been distributed in two parts. PySpark will try to distribute data evenly to executors, so it will distribute eight data points to each executor. Now let’s take five data points out of our RDD and see what that data looks like:
>>> filDataPairedRDD11.take(5)
Here is the output:
[('filamentA', [605, 1]),
 ('filamentB', [683, 1]),
 ('filamentB', [691, 1]),
 ('filamentB', [561, 1]),
 ('filamentA', [530, 1])]
We can see that first and fifth element of our filDataPairedRDD11 RDD has the key filamentA. Therefore, the first l1 and l2 of our reduceByKey() function for filament type filamentA will be [605, 1] and [530, 1], respectively.
>>> filDataSumandCount.collect()
Here is the output:
[('filamentA', [4684, 8]),
 ('filamentB', [5041, 8])]
Finally, we have the summation of the life hours of bulbs and the count, based on filament type. The next step is to divide the sum by the count to get the mean value. Let’s do that:
>>> filDataMeanandCount = filDataSumandCount.map( lambda l : [l[0],float(l[1][0])/l[1][1],l[1][1]])
>>> filDataMeanandCount.collect()
Here is the output:
[['filamentA', 585.5, 8],
 ['filamentB', 630.125, 8]]
Finally, we have our required mean, based on filament type. The mean lifetime of filamentA is 585.5 hours, and the mean lifetime of filamentB is 630.125 hours. We can infer that filamentB has a longer life than filamentA.

Step 5-2-4. Finding the Mean Lifetime Based on Bulb Power

First, we will start with creating our paired RDD. The key will be the bulb power, and the value will be the life in hours:
>>> filDataPairedRDD2 = filDataSingleRDD.map(lambda x : (x[1], x[2]))
>>> filDataPairedRDD2.take(4)
Here is the output:
[('100W', 605),
 ('100W', 683),
 ('100W', 691),
 ('200W', 561)]
We have created a paired RDD, filDataPairedRDD2, and each element is a pair: of bulb power and the corresponding life in hours.
>>> fillDataPairedRDD22 = filDataPairedRDD2.map( lambda x : (x[0],[x[1],1]))
>>> fillDataPairedRDD22.take(4)
Here is the output:
[('100W', [605, 1]),
 ('100W', [683, 1]),
 ('100W', [691, 1]),
 ('200W', [561, 1])]
Now we have included 1 in the value part of the RDD. Therefore, each value is a list that consists of the life in hours and a 1.
>>> powerSumandCount = fillDataPairedRDD22.reduceByKey(lambda l1,l2 : [l1[0]+l2[0], l1[1]+l2[1]])
>>> powerSumandCount.collect()
Here is the output:
[('100W', [5270, 8]),
 ('200W', [4455, 8])]
We have calculated the sum of the total life hours and the count, with bulb power as the key.
>>> meanandCountPowerWise =powerSumandCount.map(lambda val : [val[0],[float(val[1][0])/val[1][1],val[1][1]]])
>>> meanandCountPowerWise.collect()
Here is the output:
[['100W', [658.75, 8]], ['200W', [556.875, 8]]]
In this last step, we have computed the mean and the count. From the result, we can infer that the mean life of 100W bulbs is longer than that of 200W bulbs.

Step 5-2-5. Finding the Mean Lifetime Based on Filament Type and Power

To solve this part of the exercise, we need a paired RDD with keys that are complex. You might be wondering what a complex key is. Complex keys have more than one type. In our case, our complex key will have both the filament type and bulb power type. Let’s start creating our paired RDD with a complex key type:
>>> filDataSingleRDD.take(4)
Here is the output:
[['filamentA', '100W', 605],
 ['filamentB', '100W', 683],
 ['filamentB', '100W', 691],
 ['filamentB', '200W', 561]]
>>> filDataComplexKeyData = filDataSingleRDD.map( lambda val : [(val[0], val[1]),val[2]])
>>> filDataComplexKeyData.take(4)
Here is the output:
[[('filamentA', '100W'), 605],
 [('filamentB', '100W'), 683],
 [('filamentB', '100W'), 691],
 [('filamentB', '200W'), 561]]
We have created a paired RDD named filDataComplexKeyData. It can be easily observed that it has complex keys. The keys are a combination of filament type and bulb power. The rest of the exercise will move as in the previous step. In the following code, we are going to include an extra 1 in the values:
>>> filDataComplexKeyData1 = filDataComplexKeyData.map(lambda val : [val[0], [val[1],1]])
>>> filDataComplexKeyData1.take(4)
Here is the output:
[[('filamentA', '100W'), [605, 1]],
 [('filamentB', '100W'), [683, 1]],
 [('filamentB', '100W'), [691, 1]],
 [('filamentB', '200W'), [561, 1]]]
Our required paired RDD, filDataComplexKeyData1 , has been created. Now we can apply the reduceByKey() function to get the sum and count, based on the complex keys:
>>> filDataComplexKeySumCount = filDataComplexKeyData1.reduceByKey(lambda l1,l2 : [l1[0]+l2[0], l1[1]+l2[1]])
>>> filDataComplexKeySumCount.collect()
Here is the output:
[(('filamentA', '100W'), [2514, 4]),
 (('filamentB', '200W'), [2285, 4]),
 (('filamentB', '100W'), [2756, 4]),
 (('filamentA', '200W'), [2170, 4])]
After getting the sum, the mean can be calculated as follows:
>>> filDataComplexKeyMeanCount = filDataComplexKeySumCount.map(lambda val : [val[0],[float(val[1][0])/val[1][1],val[1][1]]])
>>> filDataComplexKeyMeanCount.collect()
[[('filamentA', '100W'), [628.5, 4]],
 [('filamentB', '200W'), [571.25, 4]],
 [('filamentB', '100W'), [689.0, 4]],
 [('filamentA', '200W'), [542.5, 4]]]
Finally, we have calculated the mean and the count value .
Note
The following is a good tutorial about working with

Recipe 5-3. Join Data

Problem

You want to join data.

Solution

We have been given two tables: a Students table (Table 5-2) and a Subjects table (Table 5-3).
A430628_1_En_5_Figb_HTML.gif
Table 5-2.
Students
A430628_1_En_5_Figc_HTML.gif
Table 5-3.
Subjects
You want to perform the following on the Students and Subjects tables:
  • Inner join
  • Left outer join
  • Right outer join
  • Full outer join
Joining data tables is an integral part of data preprocessing. We are going to perform four types of data joins in this recipe.
An inner join returns all the keys that are common to both tables. It discards the key elements that are not common to both tables. In PySpark, an inner join is done by using the join() method defined on the RDD.
A left outer join includes all keys in the left table and excludes uncommon keys from the right table. A left outer join can be performed by using the leftOuterJoin() function defined on the RDD in PySpark.
Another important type of join is a right outer join. In a right outer join, every key of the second table is included, but from the first table, only those keys that are common to both tables are included. We can do a right outer join by using the rightOuterJoin() function in PySpark.
If you want to include all keys from both tables, go for a full outer join. It can be performed by using fullOuterJoin().

How It Works

We’ll follow the steps in this section to work with joins.

Step 5-3-1. Creating Nested Lists

Let’s start creating a nested list of our data from the Students table:
>>> studentData = [['si1','Robin','M'],
...                ['si2','Maria','F'],
...                ['si3','Julie','F'],
...                ['si4','Bob',  'M'],
...                ['si6','William','M']]
After creating our Students table data, the next step is to create a nested list of Subjects table data:
>>> subjectsData = [['si1','Python'],
...                 ['si3','Java'],
...                 ['si1','Java'],
...                 ['si2','Python'],
...                 ['si3','Ruby'],
...                 ['si4','C++'],
...                 ['si5','C'],
...                 ['si4','Python'],
...                 ['si2','Java']]
We have created nested lists from the Students table and Subjects table.

Step 5-3-2. Creating a Paired RDD of Students and Subjects

Before creating a paired RDD, we first have to create a single RDD. Let’s create studentRDD:
>>> studentRDD = sc.parallelize(studentData, 2)
>>> studentRDD.take(4)
Here is the output:
[['si1', 'Robin', 'M'],
 ['si2', 'Maria', 'F'],
 ['si3', 'Julie', 'F'],
 ['si4', 'Bob', 'M']]
We can see that, every element of the studentRDD RDD is a list, and each list has three elements. Now we have to transform it into a paired RDD:
>>> studentPairedRDD = studentRDD.map(lambda val : (val[0],[val[1],val[2]]))
>>> studentPairedRDD.take(4)
Here is the output:
[('si1', ['Robin', 'M']),
('si2', ['Maria', 'F']),
 ('si3', ['Julie', 'F']),
 ('si4', ['Bob', 'M'])]
The paired RDD, studentPairedRDD, has the student ID as the key. Now we have to create a paired RDD of the subject data:
>>> subjectsPairedRDD = sc.parallelize(subjectsData, 2)
>>> subjectsPairedRDD.take(4)
Here is the output:
[['si1', 'Python'],
 ['si3', 'Java'],
 ['si1', 'Java'],
 ['si2', 'Python']]
We do not need to do anything extra to create a paired RDD of subject data.

Step 5-3-3. Performing an Inner Join

As we know, an inner join in PySpark is done by using the join() function. We have to apply this function on the paired RDD studentPairedRDD, and provide subjectsPairedRDD as an argument to the join() function:
>>> studenSubjectsInnerJoin = studentPairedRDD.join(subjectsPairedRDD)
>>> studenSubjectsInnerJoin.collect()
Here is the output:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
Analyzing the output of this inner join reveals that the key part contains only keys that are common to the Students and Subjects tables; these appear in the joined table. The keys that are not common to both tables are not the part of joined table.

Step 5-3-4. Performing a Left Outer Join

A left outer join can be performed by using the leftOuterJoin() function:
>>> studentSubjectsleftOuterJoin = studentPairedRDD.leftOuterJoin(subjectsPairedRDD)
>>> studentSubjectsleftOuterJoin.collect()
Here is the output:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si6', (['William', 'M'], None)),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
Student ID si6 is in the Students table but not in the Subjects table. Hence, the left outer join includes si6 in the joined table. Because si6 doesn’t have its counterpart in the Subjects table, it has None in place of the subject.

Step 5-3-5. Performing a Right Outer Join

A right outer join on the Students and Subjects tables can be performed by using the rightOuterJoin() function:
>>> studentSubjectsrightOuterJoin = studentPairedRDD.rightOuterJoin(subjectsPairedRDD)
>>> studentSubjectsrightOuterJoin.collect()
Here is the output:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si5', (None, 'C')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
Student ID si5 is in only the Subjects table; it is not part of the Students table. Therefore, it appears in the joined table.

Step 5-3-6. Performing a Full Outer Join

Now let’s perform a full outer join. In a full outer join, keys from both tables will be included:
>>> studentSubjectsfullOuterJoin = studentPairedRDD.fullOuterJoin(subjectsPairedRDD)
>>> studentSubjectsfullOuterJoin.collect()
Here is the output:
[('si3', (['Julie', 'F'], 'Java')),
 ('si3', (['Julie', 'F'], 'Ruby')),
 ('si2', (['Maria', 'F'], 'Python')),
 ('si2', (['Maria', 'F'], 'Java')),
 ('si6', (['William', 'M'], None)),
 ('si1', (['Robin', 'M'], 'Python')),
 ('si1', (['Robin', 'M'], 'Java')),
 ('si5', (None, 'C')),
 ('si4', (['Bob', 'M'], 'C++')),
 ('si4', (['Bob', 'M'], 'Python'))]
In the joined table, keys from both tables have been included. Student ID si6 is part of only the Students data, and it appears in the joined table. Similarly, student ID si5 is part of only the Subjects table, but it appears in our joined table.

Recipe 5-4. Calculate Page Rank

Problem

You want to calculate the page rank of the web-page system illustrated in Figure 5-2.
A430628_1_En_5_Fig2_HTML.gif
Figure 5-2.
A web-page system
We have four web pages (a, b, c, and d) in our system. Web page a has outbound links to pages b, c, and d. Similarly, page b has an outbound link to pages d and c. Web page c has an outbound link to page b, and page d has an outbound link to pages a and c.

Solution

The page-rank algorithm was developed by Sergey Brin and Larry Page. In the algorithm name, page stands for Larry Page. The developers of the page-rank algorithm later founded Google. It is an iterative algorithm.
The page rank of a particular web page indicates its relative importance within a group of web pages. The higher the page rank, the higher up it will appear in a search result list.
The importance of a page is defined by the importance of all the web pages that provide an outbound link to the web page in consideration. For example, say that web page X has very high relative importance. Web page X is outbounding to web page Y; hence, web page Y will also have high importance.
Let’s summarize the page-rank algorithm:
  1. 1.
    Initialize each page with a page rank of 1 or some arbitrary number.
     
  2. 2.
    For i in someSequence, do the following:
    1. a.
      Calculate the contribution of each inbound page.
       
    2. b.
      Calculate the sum of the contributions.
       
    3. c.
      Determine the updated page rank as follows:
       updated page rank = 1 – s + s × summation of contributions
       
     
Let me show you an example. Say PR(a), PR(b), PR(c), and PR(d) are the page ranks of pages a, b, c, and d, respectively. Page d has two inbound links: the first from page a, and the second from page b.
Now we have to know how the contribution to page rank by a web page is calculated. This contribution to page rank is given by the following formula:
page rank of contributing page
Contribution to a page = ___________________________________________________________
total number of outbounds page from the contributing page
In our example, web page a has three outbound links: the first is to page b, the second is to page c, and the last is to page d. So the contribution to page rank of page d by page a is PR(a) / 3. Now we have to calculate the contribution of page b to page d. Page b has two outbound links: the first to page c, and the second to page d. Hence, the contribution by page b is PR(b) / 2.
So the page rank of page d will be updated as follows, where s is known as the damping factor :
PR(d) = 1 – s + s × (PR(a)/3 + PR(b)/2)
Note
For more information on page rank, visit Wikipedia at https://en.wikipedia.org/wiki/PageRank .

How It Works

Follow the steps in this section to work with page rank.

Step 5-4-1. Creating Nested Lists of Web Pages with Outbound Links and Initializing Rank

>>> pageLinks =  [['a', ['b','c','d']],
                        ['c', ['b']],
...                             ['b', ['d','c']],
...                             ['d', ['a','c']]]
We have created a nested list of web pages and their outbound links. Now we have to initialize the page ranks of all the pages. We are initializing them by 1:
>>> pageRanks =  [['a',1],
...               ['c',1],
...               ['b',1],
...               ['d',1]]
After creating ranks for the nested list, we have to define the number of iterations for running the page rank.

Step 5-4-2. Writing a Function to Calculate Contributions

We are going to write a function that will take two arguments. The first argument of our function is a list of web page URIs, which provide the outbound links to other web pages. The second argument is the rank of the web page accessed through the outbound links that are the first argument. The function will return the contribution to all the web pages in the first argument:
>>> def rankContribution(uris, rank):
...     numberOfUris = len(uris)
...     rankContribution = float(rank) / numberOfUris
...     newrank =[]
...     for uri in uris:
...       newrank.append((uri, rankContribution))
...     return newrank
This code is very explicable. Our function, rankContribution, will return the contribution to the page rank for the list of URIs (first variable). The function will first calculate the number of elements in our list URIs ; then it will calculate the rank contributions to the given URIs. And finally, for each URI, the contributed rank will be returned.

Step 5-4-3. Creating Paired RDDs

Let’s first create our paired RDDs of link data:
>>> pageLinksRDD  = sc.parallelize(pageLinks, 2)
>>> pageLinksRDD.collect()
Here is the output:
[['a', ['b', 'c', 'd']],
 ['c', ['b']],
 ['b', ['d', 'c']],
 ['d', ['a', 'c']]]
And then we’ll create the paired RDD of our rank data:
>>> pageRanksRDD  = sc.parallelize(pageRanks, 2)
>>> pageRanksRDD.collect()
Here is the output:
[['a', 1],
 ['c', 1],
 ['b', 1],
 ['d', 1]]

Step 5-4-4. Creating a Loop for Updating Page Rank

Now it is time to write our final loop to update the page rank of every page:
>>>numIter = 20
>>>s = 0.85
We have defined the number of iterations and the damping factor, s.
>>> for i in range(numIter):
...     linksRank = pageLinksRDD.join(pageRanksRDD)
...     contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
...     sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2)
...     pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1]))
...
Let’s investigate our for loop . First we join pageLinksRDD to pageRanksRDD via an inner join. Then the second line of the for block calculates the contribution to the page rank by using the rankContribution() function we defined previously.
In the next line, we aggregate all the contributions we have. In the last line, we update the rank of each web page by using the map() function. Now it is time to enjoy the result:
>>> pageRanksRDD.collect()
Here is the output:
[('b', 1.3572437951279825),
 ('c', 1.2463781024360086),
 ('d', 0.8746512999550939),
 ('a', 0.521726802480915)]
Finally, we have the estimated page rank for every page.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.138.172.130