Joining skewed data using Apache Pig skewed join

Data skew is a serious problem in a distributed processing environment, and occurs when the data is not evenly divided among the emitted key tuples from the map phase. This can lead to inconsistent processing times. In the MapReduce framework, data skew can cause some mappers/reducers to take significantly more time to perform a task as compared to other mappers/reducers in the job.

Apache Pig has the skewed join to help alleviate the data skew issue with joins. This recipe will demonstrate how to join a skewed dataset, with a small table.

Getting ready

Download the apache_nobots_tsv.txt and nobots_ip_country_tsv.txt datasets from http://www.packtpub.com/support and place them in the folder which you are currently working on. You will also need a recent version of Apache Pig (0.9 or later) installed on the cluster.

To skew the apache_nobots_tsv.txt file, create the following shell script to append the same row a few thousand times to a new file named skewed_apache_nobots_tsv.txt:

#!/bin/bash

cat apache_nobots_tsv.txt > skewed_apache_nobots_tsv.txt
for i in {1..5000}
do
  head -n1 apache_nobots_tsv.txt >> skewed_apache_nobots_tsv.txt
done

The IP address 221.220.8.0 will appear significantly higher number of times in the skewed_apache_nobots_tsv.txt file than any other IP.

Place the skewed_apache_nobots_tsv.txt and nobots_ip_country_tsv.txt files into HDFS:

$hadoop fs –put skewed_apache_nobots_tsv.txt /user/hadoop/
$hadoop fs –put nobots_ip_country_tsv.txt /user/hadoop/

How to do it...

Follow the steps to perform a skewed join in Apache Pig:

  1. Open a text editor and create a file named skewed_join.pig. Create two relations to load the two datasets:
    nobots_weblogs = LOAD '/user/hadoop/skewed_apache_nobots_tsv.txt' AS (ip: chararray, timestamp:long, page:chararray, http_status:int, payload_size:int, useragent:chararray);
    ip_country_tbl = LOAD '/user/hadoop/nobots_ip_country_tsv.txt' AS (ip:chararray, country:chararray);
  2. Join the two datasets using the skewed keyword:
    weblog_country_jnd = JOIN nobots_weblogs BY ip, ip_country_tbl BY ip USING 'skewed';
  3. Format the joined relationship and store the result:
    cleaned = FOREACH weblog_country_jnd GENERATE ip_country_tbl::ip, country, timestamp, page, http_status, payload_size, useragent;
    STORE cleaned INTO '/user/hadoop/weblog_country_jnd_skewed';
  4. Run the job:
    $ pig –f skewed_join.pig

How it works...

In step 1, we defined two relations: nobots_weblogs and ip_country_tbl, to refer to the two datasets.

In step 2, we joined the two datasets on the ip field using Pig's skewed join. Pig will launch two MapReduce jobs to perform the skewed join. The first MapReduce job will sample the nobots_weblogs.txt (the skewed data) dataset. The second MapReduce job will perform a reduce-side join. Pig will determine how the data is distributed to the reducers based on the sampling from the first map reduce job. If there is skew present in the dataset, Pig will attempt to optimize the data distribution to the reducers.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.212.212