Using Hive and Python to clean and transform geographical event data

This recipe uses certain operators in Hive to input/output data through a custom Python script. The script performs a few simple pruning operations over each row, and outputs a slightly modified version of the row into a Hive table.

Getting ready

You will need to download/compile/install the following:

This recipe requires the Nigera_ACLED.csv file to be loaded into a Hive table named acled_nigeria with the following fields mapped to the respective data types.

Issue the following command to the Hive client:

describe acled_nigeria

You should see the following response:

OK
loc string
event_date  string
year  string
event_type   string  
actor  string   
latitude  double
longitude double
source string
fatalities   string  

How to do it...

Follow these steps to use Python and Hive to transform data:

  1. Create a file named clean_and_transform_acled.hql in your local working directory and add the inline creation and transformation syntax:
    SET mapred.child.java.opts=-Xmx512M;
    
    DROP TABLE IF EXISTS acled_nigeria_cleaned;
    CREATE TABLE acled_nigeria_cleaned (
        loc STRING,
        event_date STRING,
        event_type STRING,
        actor STRING,
        latitude DOUBLE,
        longitude DOUBLE,
        source STRING,
        fatalities INT
    ) ROW FORMAT DELIMITED;
    
    ADD FILE ./clean_acled_nigeria.py;
    INSERT OVERWRITE TABLE acled_nigeria_cleaned
        SELECT TRANSFORM(
                if(loc != "", loc, 'Unknown'),
                event_date,
                year,
                event_type,
                actor,
                latitude,
                longitude,
                source,
                if(fatalities != "", fatalities, 'ZERO_FLAG'))
        USING 'python clean_acled_nigeria.py'
        AS (loc, event_date, event_type, actor, latitude, longitude, source, fatalities)
        FROM acled_nigeria;
  2. Next, create another file named clean_acled_nigeria.py in the same working directory as clean_and_transform_acled.hql and add the following Python code to read from stdin:
    #!/usr/bin/env python
    import sys
    
    for line in sys.stdin:
       (loc, event_date, year, event_type, actor, lat, lon, src, fatalities) = line.strip().split('	')
      if loc != 'LOCATION': #remove header row
        if fatalities == 'ZERO_FLAG':
        fatalities = '0'
          print '	'.join([loc, event_date, event_type,  actor, lat, lon, src, fatalities]) #strip out year

    Note

    It is important to note that Python is sensitive to inconsistent indentation. Be careful if you are copying and pasting Python code.

  3. Run the script from the operating system shell by supplying the –f option to the Hive client:
    $ hive –f clean_and_transform_acled.hql
  4. To verify that the script finished properly, run the following command using the –e option to the Hive client.
    hive –e "select count(1) from acled_nigeria_cleaned"

    Hive should count 2931 rows.

How it works...

Let's start with the Hive script that we created. The first line is simply to force a certain JVM heap size in our execution. You can set this to any size that may be appropriate for your cluster. For the ACLED Nigeria dataset, a 512 MB memory is more than enough.

Immediately following this, we drop any tables with the name acled_nigeria_cleaned and create a table by the same name. We can omit the fields delimited by ',' and rows delimited by ' ' since they are the default field and row delimiters assumed by ROW FORMAT, and the ACLED Nigeria data is in that format.

Once we have our receiving table defined, we need to define the SELECT statement that will transform and output the data. The common convention is to add scripts required by SELECT before the statement. The command ADD FILE ./clean_acled_nigeria.py tells Hive to load the script from the local filesystem into the distributed cache for use by the MapReduce tasks.

The SELECT statement uses the Hive TRANSFORM operator to separate each column by tabs and to cast all columns as String with nulls as ' '. The columns loc and fatalities are conditionally checked for empty strings; and if found to be empty, are set to a default value.

We specify the USING operator to provide a custom script to work with the TRANSFORM operator. Hive requires that scripts that make a call to the USING operator for row transformation need to first invoke TRANSFORM with the appropriate columns. If the file has been placed on the distributed cache, and each node in the cluster has Python installed, the MapReduce JVM tasks will be able to execute the script and read the rows in parallel. The AS operator contains a list of named fields corresponding to the columns found in the receiving Hive table, acled_nigeria_cleaned.

The Python script is very straightforward. The #!/usr/bin/env python statement is a hint to tell the shell how to execute the script. Each row from the table is passed in as a line over standard input. The call to strip() method removes any leading/trailing whitespace, and then we tokenize it into an array of named variables. Each field from the row is put in a named variable. The raw ACLED Nigeria data was used to create the input Hive table, and contains a header row we wish to discard. The first condition will check for 'LOCATION' as the value of loc, which indicates the header row we want to ignore.

If the row passes this check, we look for the presence of 'ZERO_FLAG' as the value for fatalities, which we set in our Hive script. If the script detects this value for fatalities, we set the value of fatalities to the string '0'.

Finally, we output each field excluding year in the same order as it was input. Each row will be placed into the table acled_nigeria_cleaned.

There's more...

There is a lot going on in this recipe. The following are a few additional explanations that will help you with Hive TRANSFORM/USING/AS operations and ETL in general.

Making every column type String

This is a bit counterintuitive and certainly not found anywhere in the Hive documentation. If your initial Hive staging table for the incoming data maps each delimited field as a string, it will aid tremendously in data validation and debugging. You can use the Hive STRING type to successfully represent almost any input into a cleansing script or direct Hive QL statement. Trying to perfectly map datatypes over expected values is not flexible to an erroneous input. There may be malformed characters for fields where you expect numeric values, and other similar hang-ups that make it impossible to perform certain analytics. Using strings over the raw data fields will allow a custom script to inspect the invalid data and decide how to respond. Moreover, when dealing with CSV or tab-separated data, a slightly misaligned INT or FLOAT type mapping in your Hive table declaration, where the data has a STRING, could lead to NULL mappings per row. String mappings for every field in the raw table will show you column misalignment failures such as these, very quickly. This is just a matter of preference, and only applies to tables designed for holding the raw or dirty input for immediate validation and transformation into other Hive tables.

Type casing values using the AS keyword

This recipe only outputs strings from the Python script for use over standard output. Hive will attempt to cast them to the appropriate type in the receiving table. The advantage to this is the time and coding space saved by not having to explicitly cast every field with the AS operator. The disadvantage is that this will not fail should a value be cast to an incompatible type. For instance, outputting HI THERE to a numeric field will insert NULL for the field value for that row. This can lead to undesirable behavior for subsequent SELECT statements over the table.

Testing the script locally

This one is pretty self-explanatory. It is much easier to debug your script directly on the command line than it is across MapReduce task error logs. It likely will not prevent you from having to troubleshoot issues dealing with scale or data validity, but it will eliminate a large majority of the compile time and control flow issues.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.16.79.147