Defining new Cascalog operators

Cascalog comes with a number of operators; however, you'll often need to define your own, as we saw in the Aggregating data in Cascalog recipe.

For different uses, Cascalog defines a number of different categories of operators, each with different properties. Some are run in the map phase of processing, and some are run in the reduce phase. The ones in the map phase can use a number of extra optimizations, so if you can push some of your processing into that stage, you'll get better performance. In this recipe, you'll see which categories of operators are on the map side and which are on the reduce side. We'll also provide an example of each and see how they fit into the larger processing model.

Getting ready

For this recipe, we'll use the same dependencies and inclusions that we did in the Initializing Cascalog and Hadoop for distributed processing recipe. We'll also use the Doctor Who companion data from that recipe.

How to do it…

As I mentioned, Cascalog allows you to specify a number of different operator types. Each type is used in a different situation and with different classes of problems and operations. Let's take a look at each type of operator.

Creating map operators

Map operators transform data in the map phase, with one input row being mapped to one output row. A simple example of a custom map operator is an operator that triples all the numbers that pass through it:

(defmapfn triple-value [x] (* 3 x))

Something similar to this can be used to rescale all the values in a field.

Creating map concatenation operators

Map concatenation operators transform data in the map phase, but each input row can map to one output row, many output rows, or none. These operators return a sequence, and each item in the sequence is a new output row. For example, this operator splits a string in whitespace, and each token is a new output row. We'll use this in the following predicate to count the number of names that each companion had:

(defmapcatfn split [string] (string/split string #"s+"))
(?<- (stdout) 
     [?name ?count]
     (full-name _ ?name) (split ?name :> ?token) (c/count ?count))

Creating filter operators

Filter operators remove rows from the output in the map phase. They take one item, and they return a Boolean. If true, the item should be included in the results; if false, then the item should be held back. For example, this filter returns true if the input is an even number, and we use it in a query to return only the companions who accompanied an even number of doctors:

(deffilterfn is-even? [x] (even? x))
(?<- (stdout) 
[?companion ?dr-count]
     (doctor ?companion _)
     (c/count ?dr-count)
     (is-even? ?dr-count))

Creating buffer operators

Buffers operators work in the reduce phase. They process a group of rows as a single input. They take an entire list of input rows to process and return one or more items for the output. For example, this buffer operator takes rows of strings and returns the total number of characters in all the strings. We use it in the query to count the characters in the full names of the companions for each doctor:

(defbufferfn count-chars [strings]
  [(reduce + 0 (mapcat #(map count %) strings))])
(?<- (stdout) 
[?dr ?companion-chars]
     (doctor ?c ?dr)
     (full-name ?c ?name)
     (count-chars ?name :> ?companion-chars))

Creating aggregate operators

Aggregate functions work in the reduce phase to combine input rows into one value. Compared to buffer operators, aggregate operators are in some ways more flexible—they can be used with other aggregators—but are more restricted in other ways.

Each aggregator function has to be able to be called with no parameters, one parameter, or two parameters. The call with no parameters returns the initial value, while the call with one parameter takes the aggregator's state and returns the final value. Moreover, the call with two parameters takes the state and a new value and folds the two together into a new state.

For example, this returns the average length of the names of each doctor's companions:

(defaggregatefn mean-count
  ([] [0 0])
  ([[n total] string]
   [(inc n) (+ total (count string))])
  ([[n total]] [(float (/ total n))]))
(?<- (stdout) 
[?dr ?companion-chars]
     (doctor ?c ?dr)
     (full-name ?c ?name)
     (mean-count ?name :> ?companion-chars))

Creating parallel aggregate operators

Parallel aggregate operators are the most restricted, but they also give the best performance. Unlike the rest, they can be run in the map phase of the computation. These aggregators are defined by two functions. One function is called on each row, and one is called to combine the results of calling the first function on two rows.

This example returns the average length of the name of each doctor's companions:

  1. First, you have to define the aggregator functions as named functions. Cascalog serializes them as names, so you can't use anonymous functions:
    (defn mean-init [x] [1 (count x)])
    (defn mean-step [n1 t1 n2 t2] [(+ n1 n2) (+ t1 t2)])
  2. Then use these variables to define the parallel aggregator:
    (defparallelagg 
    mean-count-p
      :init-var #'mean-init
      :combine-var #'mean-step)
  3. The aggregator returns both the item count and the total number of characters, so you have to divide the two in the query that calls the aggregator:
    (?<- (stdout) [?dr ?companion-chars]
         (doctor ?c ?dr)
         (full-name ?c ?name)
         (mean-count-p ?name :> ?n ?total)
         (div ?total ?n :> ?companion-chars))

Having so many options to build operators provides us with a lot of flexibility and power in how we define and create queries and transformations in Cascalog. This allows you to create powerful, custom workflows.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.156.251