Marking the longest period of non-violence using Hive MAP/REDUCE operators and Python

The Hive query language provides facilities to control the MapReduce dataflow and inject your own custom map, and to reduce scripts at each stage. When used properly, this is a very powerful technique for writing concise MapReduce programs using minimal syntax.

This recipe will show a complete example of how to write custom MapReduce control flow using different operators in Hive. The analytic will specifically look for the longest gap in events for each location to get an idea of how frequently violence occurs in that location.

Getting ready

Make sure you have access to a pseudo-distributed or fully-distributed Hadoop cluster with Apache Hive 0.7.1 installed on your client machine and on the environment path for the active user account.

Your cluster will also need Python 2.7 or greater installed on each node and available on the environment path for the Hadoop user. The script shown in this recipe assumes an installation at /usr/bin/env python. If this does not match your installation, change the script accordingly.

This recipe depends on having the Nigeria_ACLED_cleaned.tsv dataset loaded into a Hive table named acled_nigeria_cleaned with the following fields mapped to the respective datatypes.

Issue the following command to the Hive client:

describe acled_nigeria_cleaned;

You should see the following response:

OK
loc  string
event_date  string
event_type  string
actor  string
latitude  double
longitude  double
source  string
fatalities  int

How to do it...

Perform the following steps to mark the longest period of non-violence using Hive:

  1. Open a text editor of your choice, ideally one with SQL and Python syntax highlighting.
  2. Add the following inline creation and transform syntax:
    SET mapred.child.java.opts=-Xmx512M;
    
    DROP TABLE IF EXISTS longest_event_delta_per_loc;
    CREATE TABLE longest_event_delta_per_loc (
        loc STRING,
        start_date STRING,
        end_date STRING,
        days INT
    );
    
    ADD FILE calc_longest_nonviolent_period.py;
    FROM (
            SELECT loc, event_date, event_type
            FROM acled_nigeria_cleaned
            DISTRIBUTE BY loc SORT BY loc, event_date
         ) mapout
    INSERT OVERWRITE TABLE longest_event_delta_per_loc
    REDUCE mapout.loc, mapout.event_date, mapout.event_type
    USING 'python calc_longest_nonviolent_period.py'
    AS loc, start_date, end_date, days;
  3. Save the file in the local working folder as longest_nonviolent_periods_per_location.sql.
  4. Create a new file in your text editor with the name calc_longest_nonviolent_period.py and save it in the same working folder as longest_nonviolent_periods_per_location.sql.
  5. Add the Python syntax. Python is sensitive to indentation. Keep that in mind if you are cutting and pasting this code:
    #!/usr/bin/python
    import sys
    from datetime import datetime, timedelta
    
    current_loc = "START_OF_APP"
    (prev_date, start_date, end_date, start_time_obj, end_time_obj,  current_diff)=('', '', '', None, None, timedelta.min)
    for line in sys.stdin:
      (loc,event_date,event_type) = line.strip('
    ').split('	')
      if loc != current_loc and current_loc != "START_OF_APP":
        if end_date != '': 
          print '	'.join([current_loc,start_date,event_date,str(current_diff.days)])
              (prev_date, start_date, end_date, start_time_obj, end_time_obj,current_diff)=('', '', '', None, None, timedelta.min)
        end_time_obj = datetime.strptime(event_date,'%Y-%m-%d')
      current_loc = loc
      if start_time_obj is not None: # implies > 2 events
        diff = end_time_obj - start_time_obj
        if diff > current_diff:
          current_diff = diff # set the current max time delta
          start_date = prev_date 
          end_date = event_date
      prev_date = event_date
      start_time_obj = end_time_obj
  6. Run the script from the operating system's shell by supplying the –f option to the Hive client:
    hive –f longest_nonviolent_periods_per_location.sql
  7. Issue the following query directly to the Hive shell. You should see rows printed to the console in no particular order:
    hive –e "select * from longest_event_delta_per_loc;"

How it works...

Let's start with the Hive script we created. The first line is simply to force a certain JVM heap size in our execution. You can set this to whatever is appropriate for your cluster. For the ACLED Nigeria dataset, 512 MB is more than enough.

Then we create our table definition for the output, dropping any existing tables with a matching name longest_event_delta_per_loc. The table requires four fields per record: loc for the location, start_date to hold the value of the event_date field of the lower bound, end_date to hold the value of event_date field of the upper bound, and days to show the total number of days elapsed between the events.

We then add the file calc_longest_nonviolent_period.py to the distributed cache for use across the different reducer JVMs. This will be used as our reduce script, but first we must organize the map output. The inner SELECT statement grabs loc, event_date, and event_type from the acled_nigeria_cleaned table in Hive. The DISTRIBUTE BY loc statement tells Hive to guarantee that all rows with matching values for loc go to the same reducer. SORT BY loc, event_date tells Hive to sort the data as it arrives to each reducer by the combination of loc and event_date. Now the same reducer can process every row corresponding to each location locally, and in the sorted order of event_date.

We alias the output of this SELECT statement to mapout and use the shorthand REDUCE operator to process each row from mapout. The USING clause lets us supply a custom Python script to read each record as it comes over stdin. The AS operator lets us map the delimited fields that are output by the script over stdout to pipe into the fields of the receiving table.

The Python script calc_longest_nonviolent_period.py will be used by the reduce stage to compute the longest time gap between the events for each location. Since we have guaranteed that all records with a common loc value are at the same reducer and that those records are in the date-sorted order for each location, we are now in a position to understand how the Python script works.

In the Python script calc_longest_nonviolent_period.py, we start with #!/usr/bin/python as a hint to the shell on how to execute the script. We need to import sys to use the stdin and stdout operations. We also need the datetime and timedelta class definitions from the datetime package.

The script operates very procedurally and can be a bit difficult to follow. First we declare current_loc and initialize its value to START_OF_APP as a flag to the print out conditional. We then set up several different variables to hold different placeholder values to be used on a per-location basis by the for loop.

  • prev_date: This holds the last observed event_date for the loc value. It is blank if it's the start of the app, or holds a new location value.
  • start_date: This holds the lower bound for the longest currently observed time delta between events for that value of loc.
  • end_date: This holds the upper bound for the longest currently observed time elapsed between events for the value of current_loc.
  • start_time_obj: This holds the most recently iterated datetime object, or None if it's the start of the app, or holds a new location value.
  • end_time_obj: This holds the current event_datedatetime object, or None if it's the start of the app, or holds a new location value.
  • current_diff: This holds the time delta for the current longest observed time elapsed between events for the current_loc, or the lowest possible time delta if it's the start of the app, or a new location value.

The for loop reads rows over stdin that have already been sorted by the combination of loc and event_date. We parse each row into variables representing the column values by first stripping any additional newlines and splitting the line on tab characters.

The first conditional is skipped as current_loc is equal to START OF APP. We have only begun processing the first row across all locations on that reducer, and have nothing to output yet. Should we have a value for loc that is different from the value of current_loc, and we are not at the start of the application, then that is a signal that we are done processing the rows for current_loc, and can safely output the longest time delta for events in that location. Should end_date still be set to an empty string, then that indicates we only saw one event for that location. In this scenario, we do not output anything for that location. Finally, we reset the six placeholder variables previously explained, so that we may accurately process the records for the next location.

Following the conditional, we immediately set the value of current_loc that we are processing equal to loc, to avoid unnecessary entry of the mentioned conditional on the next iteration when we have not yet transitioned locations. We set end_time_obj to the value of event_date for the current row. If start_time_obj is set to None, then that means we are on the first row for that location and cannot yet do a time delta comparison. Whether or not start_time_obj is set to None, at the end of the loop we set prev_date equal to event_date and start_time_obj equal to end_time_obj of the current iteration. By doing so, on the next iteration, start_time_obj will hold the event_date of the previous record, while end_time_obj will hold the event_date of the current record.

When start_time_obj is no longer set to None after the first iteration for a given location, we can begin doing diff comparisons on these two datetime objects. Subtracting start_time_obj from end_time_obj yields a time delta object, which if larger than the current_diff value, gets set as the value for current_diff. In doing so, we capture the longest elapsed time period for that location between events. We also set the values of start_date and end_date for easy output later, once we are done processing this location. As mentioned earlier, whether or not we reset current_diff, we then change prev_date to point to event_date and start_time_obj equal to the current end_time_obj.

The next time the loop encounters the condition where loc is not equal to current_loc, we output the currently held longest time difference between events, before we move onto the next event. Each print to stdout writes a row into the receiving Hive table that holds each location held by current_loc, the lower_bound event_date string held by start_date, the upper bound event_date string held by end_date, and the total number of days elapsed between those two dates held by current_diff.days.

There's more...

Here are a few additional notes on some of the operations touched upon in this recipe:.

SORT BY versus DISTRIBUTE BY versus CLUSTER BY versus ORDER BY

These four operator variants always cause confusion to Hive beginners. Here's a quick comparison so you'll know which one is appropriate for your use case:

  • DISTRIBUTE BY: Rows with matching column values will partition to the same reducer. When used alone, it does not guarantee sorted input to the reducer.
  • SORT BY: This dictates which columns to sort by when ordering reducer input records.
  • CLUSTER BY: This is a shorthand operator to perform both SORT BY and DISTRIBUTE BY operations on a group of columns.
  • ORDER BY: This is similar to the traditional SQL operator. Sorted order is maintained across all of the output from every reducer. Use this with caution as it can force all of the output records to a single reducer to perform the sorting. Usage with LIMIT is strongly recommended.

MAP and REDUCE keywords are shorthand for SELECT TRANSFORM

The Hive keywords MAP and REDUCE are shorthand notations for SELECT TRANSFORM, and do not force the query execution to jump around stages. You can use any one of the three and achieve the same functional results. They are simply for query readability purposes.

See also

  • The Using Hive and Python to clean and transform geographical event data in recipe Chapter 3, Extracting and Transforming Data
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.79.241