Using Python and Hadoop Streaming to perform a time series analytic

This recipe shows how to use Hadoop Streaming with Python to perform a basic time series analysis over the cleansed ACLED Nigeria dataset. The program is designed to output a list of dates in sorted order for each location where the government in Nigeria regained territory.

For this recipe, we will use structured Nigerian conflict data provided by Armed Conflict Location and Event dataset collections team.

Getting ready

You will need to download/compile/install the following:

How to do it...

The following are the steps to use Python with Hadoop Streaming:

  1. Create a shell script named run_location_regains.sh that runs the Streaming job. It is important to change the streaming JAR path to match the absolute path of your hadoop-streaming.jar file. The path of the hadoop-streaming.jar file is different depending on the Hadoop distribution:
    #!/bin/bash
    $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u1.jar 
        -input /input/acled_cleaned/Nigeria_ACLED_cleaned.tsv 
        -output /output/acled_analytic_out 
        -mapper location_regains_mapper.py 
        -reducer location_regains_by_time.py 
        -file location_regains_by_time.py 
        -file location_regains_mapper.py 
        -jobconf stream.num.map.output.key.fields=2 
        -jobconf map.output.key.field.separator=	 
        -jobconf num.key.fields.for.partition=1 
        -jobconf mapred.reduce.tasks=1
  2. Create a Python file named location_regains_mapper.py and add the following:
    #!/usr/bin/python
    import sys
    
    for line in sys.stdin:
        (loc, event_date, event_type, actor, lat, lon, src, fatalities) = line.strip().split('	'),
        (day,month,year) = event_date.split('/')
        if len(day) == 1:
            day = '0' + day
        if len(month) == 1:
            month = '0' + month;
        if len(year) == 2:
            if int(year) > 30 and int(year) < 99:
                year = '19' + year
            else:
                year = '20' + year
        event_date = year + '-' + month + '-' + day
        print '	'.join([loc, event_date, event_type]);
  3. Create a Python file named location_regains_by_time.py and add the following:
    #!/usr/bin/python
    import sys
    
    current_loc = "START_OF_APP"
    govt_regains=[]
    for line in sys.stdin:
      (loc,event_date,event_type) = line.strip('
    ').split('	')
      if loc != current_loc:
        if current_loc != "START_OF_APP":
          print current_loc + '	' + '	'.join(govt_regains)
        current_loc = loc
        govt_regains = []
      if event_type.find('regains') != -1:
          govt_regains.append(event_date)
  4. Run the shell script from the local working directory, which should contain all of the Python scripts that we created previously:
    ./run_location_regains.sh

You should see the job start from the command line and finish successfully:

INFO streaming.StreamJob: Output: /output/acled_analytic_out

How it works...

The shell script sets up the Hadoop Streaming JAR path and passes the necessary arguments to the program. Each argument is explained in detail as follows:

Argument

Description

-input /input/acled_cleaned/Nigeria_ACLED_cleaned.tsv

The HDFS path to the input data for MapReduce.

-output /output/acled_analytic_out

The HDFS path for MapReduce to write the job output.

-mapper location_regains_mapper.py

Script to be run as the map function; records passed via STDIN/STDOUT.

-reducer location_regains_by_time.py

Script to be run as the reduce function.

-file location_regains_by_time.py

Add a file to the distributed cache. This is required for external scripts.

-file location_regains_mapper.py

Add a file to the distributed cache.

-jobconf stream.num.map.output.key.fields=2

Tells the streaming tool which field/fields should be treated as the map output key. Our mapper outputs three fields per record. This parameter tells the program to treat the first two as the key. This will leverage the secondary sort feature in MapReduce to sort our rows based on the composite of these two fields.

-jobconf map.output.key.field.separator=

Parameter for setting the delimiter token on the key.

-jobconf num.key.fields.for.partition=1

Guarantees that all of the map output records with the same value in the first field of the key are sent to the same reducer.

-jobconf mapred.reduce.tasks=1

Number of JVM tasks to reduce over the output keys.

The Python script used in the map phase gets a line corresponding to each record. We call strip() to remove any leading/trailing whitespace and then split the line on tabs. The result is an array of variables descriptively named to the row fields they hold.

The event_date field in the raw input requires some processing. In order for the framework to sort records in ascending order of dates, we want to take the current form, which is dd/mm/yy and convert it to yyyy-mm-dd. Since some of the events occurred before the year 2000, we need to expand the year variable out to four digits. Single-digit days and months are zero-padded, so that it sorts correctly.

This analytics only requires location, event_date, and event_type to be output to the reduce stage. In the shell script, we specified the first two fields as the output key. Specifying location as the first field groups all records with the same location on a common reducer. Specifying event_date as the second field allows the MapReduce framework to sort the records by the composite of location and event_date. The value in each key-value pair is simply of the event_type field.

Sample map output:

(cityA, 2010-08-09, explosion)
(cityB, 2008-10-10, fire)
(cityA, 2009-07-03, riots)

Order reducer shows the records that are sorted on the composite value of location and event_date

(cityA, 2009-07-03, riots)
(cityA, 2010-08-09,explosion)
(cityB, 2008-10-10,fire)

Our configuration specifies only one reducer, so in this recipe all of the rows will partition to the same reduce Java Virtual Machine (JVM). If multiple reduce tasks are specified, cityA and cityB could be processed independently on separate reduce JVMs.

Understanding how the MapReduce framework sorts and handles the output of the location_regains_mapper.py file is important to determine how the reduce script works.

We use location_regains_by_time.py to iterate over the sorted collection of events per location, and aggregate events that match a particular type.

As the records were partitioned by location, we can assume that each partition will go to its own mapper. Furthermore, because we specified event_date as an additional sort column, we can make the assumption that the events corresponding to a given location are sorted by date in the ascending order. Now we are in a position to understand how the script works.

The script must keep a track of when a loc input changes from the previous location. Such a change signifies that we are done processing the previous location, since they are all in sorted order. We initialize the current_loc flag to START_OF_APP. We also declare an empty array govt_regains to hold the dates of events we are interested in.

The program starts by processing each line into variables. If there is a change in loc and it is not the beginning of the application, we know to output the current govt_regains collection to standard out. The change means that we are done processing the previous location, and can safely write its collection of event dates out of the reducer.

If the incoming loc value is the same as current_loc, we know that the incoming event still corresponds to the location we are currently processing. We check to see if the event is of the type regains to show the government the regained territories in that region. If it matches that type, we add it to the current govt_regains collection. Since the incoming records are sorted by event_date, we are guaranteed that the records are inserted in govt_regains in the ascending order of dates.

The net result is a single part file that is output from the reducer with a list of locations in lexicographically sorted order. To the right-hand side of each location is a tab-separated sorted list of dates matching the occurrences of when the government regained territory in that location.

There's more...

Hadoop Streaming is a very popular component. The following are a few important additions to know:

Using Hadoop Streaming with any language that can read from stdin and write to stdout

You are not limited to just Python when working with Hadoop Streaming. Java classes, shell scripts, ruby scripts, and many other languages are frequently used to transition existing code and functionality into full-fledged MapReduce programs. Any language that can read stdin and write to stdout will work with Hadoop Streaming.

Using the –file parameter to pass additional required files for MapReduce jobs

Similar to normal MapReduce programs, you can pass additional dependencies over the distributed cache to be used in your applications. Simply add additional –file parameters. For example:

-file mapper.py 
-file wordlist.txt
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.137.210.143