Using Apache Pig to sessionize web server log data

A session represents a user's continuous interaction with a website, and the user session ends when an arbitrary activity timeout has occurred. A new session begins once the user returns to the website after a period of inactivity. This recipe will use Apache Pig and a Pig user-defined function (UDF) to generate a subset of records from apache_nobots_tsv.txt that marks the beginning of a session for a specific IP.

Getting ready

You will need to download/compile/install the following:

How to do it...

The following are the steps to create an Apache Pig UDF to sessionize web server log data:

  1. Start by creating a Pig UDF to emit only the first record of a session. The UDF extends the Pig abstract class EvalFunc and implements the Pig interface, Accumulator. This class is responsible for applying the session logic on the web server log dataset:
    public class Sessionize extends EvalFunc<DataBag> implements Accumulator<DataBag> {
        
        private long sessionLength = 0;
        private Long lastSession = null;
        private DataBag sessionBag = null;
        
        public Sessionize(String seconds) {
            sessionLength = Integer.parseInt(seconds) * 1000;
            sessionBag = BagFactory.getInstance().newDefaultBag();
        }
    
        @Override
        public DataBag exec(Tuple tuple) throws IOException {
            accumulate(tuple);
            DataBag bag = getValue();
            cleanup();
            return bag;
        }
    
        @Override
        public void accumulate(Tuple tuple) throws IOException {
            if (tuple == null || tuple.size() == 0) {
                return;
            }
            DataBag inputBag = (DataBag) tuple.get(0);
            for(Tuple t: inputBag) {
                Long timestamp = (Long)t.get(1);
                if (lastSession == null) {
                    sessionBag.add(t);
                }
                else if ((timestamp - lastSession) >= sessionLength) {
                    sessionBag.add(t);
                }
                lastSession = timestamp;
            }
        }
    
        @Override
        public DataBag getValue() {
            return sessionBag;
        }    @Override
        public void cleanup() {
            lastSession = null;
            sessionBag = BagFactory.getInstance().newDefaultBag();
        }
    }
  2. Next, create a Pig script to load and group the web server log records by IP address:
    register myjar.jar;
    define Sessionize com.packt.ch3.etl.pig.Sessionize('1800'), /* 30 minutes */
    
    nobots_weblogs = LOAD '/user/hadoop/apache_nobots_tsv.txt' AS (ip: chararray, timestamp:long, page:chararray, http_status:int, payload_size:int, useragent:chararray);
    
    ip_groups = GROUP nobots_weblogs BY ip;
  3. Finally, write the Pig expression to order all of the records associated with a specific IP by timestamp. Then, send the ordered records to the Sessionize UDF:
    sessions = FOREACH ip_groups {
                    ordered_by_timestamp = ORDER nobots_weblogs BY timestamp;
                    GENERATE FLATTEN(Sessionize(ordered_by_timestamp));
               }
    
    STORE sessions INTO '/user/jowens/sessions';
  4. Copy the JAR file containing the Sessionize class to the current working directory, and run the Pig script:
    $ pig –f sessionize.pig

How it works...

We first created a UDF that extended the EvalFunc abstract class and implemented the Accumulator interface. The EvalFunc class is used to create our own function that can be used within a Pig script. Data will be passed to the UDF via the exec(Tuple t) method, where it is processed. The Accumulator interface is optional for custom eval functions, and allows Pig to optimize the data flow and memory utilization of the UDF. Instead of passing the whole dataset, similar to how the EvalFunc class works, the Accumulator interface allows for subsets of the data to be passed to the UDF.

Next, we wrote a Pig script to group all of the web server log records by IP, and sort the records by timestamp. We need the data sorted by timestamp because the Sessionize UDF uses the sorted order of the timestamps to determine the start of each session.

Then, we generated all of the sessions associated with a specific IP by calling the Sessionize alias.

Finally, we used the FLATTEN operator to unnest the Tuples in the DataBags emitted from the UDF.

See also

  • Using Python to extend Apache Pig functionality
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.40.32