Faster average computations with aggregate

In the previous section, we saw how we can use map and reduce to calculate averages. Let's now look at faster average computations with the aggregate function. You can refer to the documentation mentioned in the previous section.

The aggregate is a function that takes three arguments, none of which are optional.

The first one is the zeroValue argument, where we put in the base case of the aggregated results.

The second argument is the sequential operator (seqOp), which allows you to stack and aggregate values on top of zeroValue. You can start with zeroValue, and the seqOp function that you feed into aggregate takes values from your RDD, and stacks or aggregates it on top of zeroValue.

The last argument is combOp, which stands for combination operation, where we simply take the zeroValue argument that is now aggregated through the seqOp argument, and combine it into one value so that we can use this to conclude the aggregation.

So, here we are aggregating the elements of each partition and then the results for all the partitions using a combined function and a neutral zero value. Here, we have two things to note:

  1. The op function is allowed to modify t1, but it should not modify t2
  2.  The first function seqOp can return a different result type U

In this case, we all need one operation for merging a T into U, and one operation for merging the two Us. 

Let's go to our Jupyter Notebook to check how this is done. aggregate allows us to calculate both the total duration and the count at the same time. We call the duration_count function. We then take normal_data and we aggregate it. Remember that there are three arguments to aggregate. The first one is the initial value; that is, the zero value, (0,0). The second one is a sequential operation, as follows:

duration_count = duration.aggregate(
(0,0),
(lambda db, new_value: (db[0] + new_value, db[1] + 1))
)

We need to specify a lambda function with two arguments. The first argument is the current accumulator, or the aggregator, or what can also be called a database (db). Then, we have the second argument in our lambda function as new_value, or the current value we're processing in the RDD. We simply want to do the right thing to the database, so to say, where we know that our database looks like a tuple with the sum of duration on the first element and the count on the second element. Here, we know that our database looks like a tuple, where the sum of duration is the first element, and the count is the second element. Whenever we look at a new value, we need to add the new value to the current running total and add 1 to the current running counts.

The running total is the first element, db[0]. And we then simply need to add 1 to the second element db[1], which is the count. That's the sequential operation.

Every time we get a new_value, as shown in the previous code block, we simply add it to the running total. And, because we've added new_value to the running total, we need to increment the counts by 1. Secondly, we need to put in the combinator operation. Now, we simply need to combine the respective elements of two separate databases, db1 and db2, as follows:

duration_count = duration.aggregate(
(0,0),
(lambda db, new_value: (db[0] + new_value, db[1] + 1)),
(lambda db1, db2: (db1[0] + db2[0], db1[1] + db2[1]))
)

Since the duration counts is a tuple that collects our total duration on the first element, and counts how many durations we looked at in the second element, computing the average is very simple. We need to divide the first element by the second element as follows:

duration_count[0]/duration_count[1]

This will give us the following output:

217.82472416710442

You can see that it returns the same results as we saw in the previous section, which is great. In the next section, we are going to look at pivot tabling with key-value paired data points.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.191.235.176