Map-side join using broadcast variable

Anyone familiar with Hive concepts will be well aware of Map-side join concepts. In distributed systems, data is spread amongst different nodes. So when two distributed datasets are joined based on a key or column, the data needs to be shuffled over the network perform such operations as data belonging to same key should reach to the same node.

Map-side join comes into the picture when one of the datasets taking part in a join operation is small. To avoid shuffling the data over the network, the small dataset is loaded in the distributed cache memory and then the map operation is performed on the large dataset where it is joined with the small dataset loaded in memory. This type of join operation is known as a Map-side Join.

In Apache Spark, a Map-side join can be performed with the help of a broadcast variable. Consider we have the following datasets:

  • User ID to Country ID mapping to all the users logged in to a popular website throughout the world in the last week. Country ID to country name mapping.
  • Our requirement is to find the name of the country for every user ID. The straight forward solution to that problem is joining both datasets. However, as we know, the dataset containing the country ID to country name mapping should be small (as of 2017, there are less than two hundred countries in the world), so we can load it in memory. Let's look at it programmatically.

The two datasets are:

JavaPairRDD<String, String> userIdToCityId = jsc.parallelizePairs(Arrays.asList(new Tuple2<String, String>("1", "101"), new Tuple2<String, String>("2", "102"),new Tuple2<String, String>("3", "107"), new Tuple2<String, String>("4", "103"), new Tuple2<String, String>("11", "101"), new Tuple2<String, String>("12", "102"),new Tuple2<String, String>("13", "107"), new Tuple2<String, String>("14", "103")));

JavaPairRDD<String, String> cityIdToCityName = jsc.parallelizePairs(Arrays.asList(new Tuple2<String, String>("101", "India"), new Tuple2<String, String>("102", "UK"),new Tuple2<String, String>("103", "Germany"), new Tuple2<String, String>("107", "USA")));
We have kept datasets small for the purpose of this example. In reality, datasets will be loaded from external systems.

Now, if you join the datasets, shuffling is bound to happen. As already described in Chapter 4, Understanding Spark Programming Model a join transformation provides an overloaded method, using which the user can provide a custom partitioner for purpose of shuffling.

Instead of running the join operation, we will broadcast the dataset cityIdToCityName as follows:

Broadcast<Map<String, String>> citiesBroadcasted = jsc.broadcast(cityIdToCityName.collectAsMap());

Now, we can simply run the map operation of RDD userIdToCityId to achieve our goal without the need of shuffling (as map transformation does not shuffle data ) as follows:

JavaRDD<Tuple3<String, String, String>> joined = userIdToCityId.map(
v1 -> new Tuple3<String, String, String>(v1._1(), v1._2(),
citiesBroadcasted.value().get(v1._2())));
System.out.println(joined.collect());

The output dataset will contain three fields in every row: userId, countryid, and the respective country name. In this section, we learnt about broadcast variables, their lifecycle and applications in the real world. In the next section we will learn about another cluster-wide shared variable called Accumulator.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.129.22.164