Available actions on key/value pairs

In this section, we will be covering the following topics:

  • Available transformations on key/value pairs
  • Using countByKey()
  • Understanding the other methods

So, this is our well-known test in which we will be using transformations on key/value pairs.

First, we will create an array of user transactions for users A, B, A, B, and C for some amount, as per the following example:

 val keysWithValuesList =
Array(
UserTransaction("A", 100),
UserTransaction("B", 4),
UserTransaction("A", 100001),
UserTransaction("B", 10),
UserTransaction("C", 10)
)

We then need to key our data by a specific field, as per the following example:

val keyed = data.keyBy(_.userId)

We will key it by userId, by invoking the keyBy method with a userId parameter.

Now, our data is assigned to the keyed variable and its type is a tuple. The first element is a string, that is, userId and the second element is UserTransaction.

Let's look at the transformations that are available. First, we will look at countByKey.

Let's look at its implementation, as shown in the following example:

val data = spark.parallelize(keysWithValuesList)
val keyed = data.keyBy(_.userId)
//when
val counted = keyed.countByKey()
// keyed.combineByKey()
// keyed.aggregateByKey()
// keyed.foldByKey()
// keyed.groupByKey()
//then
counted should contain theSameElementsAs Map("B" -> 2, "A" -> 2, "C" -> 1)

This returns a Map of key K, and Long is a generic type because it can be any type of key. In this example, the key will be a string. Every operation that returns map is not entirely safe. If you see a signature of the method that is returning map, it is a sign that this data will be sent to the driver and it needs to fit in the memory. If there is too much data to fit into one driver's memory, then we will run out of memory. Hence, we need to be cautious when using this method.

We then perform an assert count that should contain the same elements as the map, as per the following example:

counted should contain theSameElementsAs Map("B" -> 2, "A" -> 2, "C" -> 1)

B is 2 because we have two values for it. Also, A is one similar to C as they have only one value. CountByKey() is not memory expensive because it only stores key and counter. However, if the key is a complex and a big object, for example, a transaction with multiple fields, which is more than two, then that map could be really big.

But let's start this test, as shown in the following example:

From the preceding screenshot, we can see that our test passed.

We also have a combineByKey() method, which combines the same elements for the same key, and shares the negative aggregateByKey() that is able to aggregate different types. We have foldByKey, which is taking the current state and value, but returns the same type as the value for the key.

We also have groupByKey(), which we learned about in the previous section. This groups everything by the specific key and returns the iterator of values for a key. This is a very memory expensive operation as well, so we need to be careful when we use it.

In the next section, we'll be using aggregateByKey instead of groupBy. We will learn how groupBy works and fix its shortcomings.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.15.47.218