Using jug to break up your pipeline into tasks

Often, we have a simple pipeline: we preprocess the initial data, compute features, and then we need to call a machine learning algorithm with the resulting features.

Jug is a package developed by Luis Pedro Coelho, one of the authors of this book. It is open source (using the liberal MIT License) and can be useful in many areas but was designed specifically around data analysis problems. It simultaneously solves several problems, for example:

  • It can memorize results to a disk (or a database), which means that if you ask it to compute something you have computed before, the result is instead read from the disk.
  • It can use multiple cores or even multiple computers on a cluster. Jug was also designed to work very well in batch computing environments that use a queuing system such as Portable Batch System (PBS), the Load Sharing Facility (LSF), or the Oracle Grid Engine (OGE, earlier known as Sun Grid Engine). This will be used in the second half of the chapter as we build online clusters and dispatch jobs to them.

About tasks

Tasks are the basic building block of jug. A task is just a function and values for its arguments, for example:

def double(x):
    return 2*x

A task could be "call double with argument 3". Another task would be "call double with argument 642.34". Using jugs, we can build these tasks as follows:

from jug import Task
t1 = Task(double, 3)
t2 = Task(double, 642.34)

Save this to a file called jugfile.py (which is just a regular Python file). Now, we can run jug execute to run the tasks. This is something you run on the command line, not at the Python prompt! Instead of Python's jugfile.py file (which should do nothing), you run jug execute.

You will also get some feedback on the tasks (jug will say that two tasks named "double" were run). Run jug execute again and it will tell you that it did nothing! It does not need to. In this case, we gained little, but if the tasks took a long time to compute, it would be very useful.

You may notice that a new directory named jugfile.jugdata also appeared on your hard drive, with a few other weirdly named files. This is the memoization cache. If you remove it, jug execute will run all your tasks (both of them) again.

Note

Often it is good to distinguish between pure functions, which simply take their inputs and return a result, from more general functions that can perform actions such as reading from files, writing to files, accessing global variables, modifying their arguments, or anything that the language allows. Some programming languages, such as Haskell, even have syntactic ways in which to distinguish pure from impure functions.

With jug, your tasks do not need to be perfectly pure. It is even recommended that you use tasks to read your data or write your results. However, accessing and modifying global variables will not work well; the tasks may be run in any order in different processors. The exceptions are global constants, but even this may confuse the memoization system (if the value is changed between runs). Similarly, you should not modify the input values. Jug has a debug mode (use jug execute—debug), which slows down your computation, but would give you useful error messages if you make this sort of mistake.

The preceding code works but it is a bit cumbersome; you are always repeating the Task(function, argument) construct. Using a bit of Python magic, we can make the code even more natural:

from jug import TaskGenerator
from time import sleep

@TaskGenerator
def double(x):
    sleep(4)
    return 2*x

@TaskGenerator
def add(a, b):
    return a + b

@TaskGenerator
def print_final_result(oname, value):
    with open(oname, 'w') as output:
        print >>output, "Final result:", value

y = double(2)
z = double(y)

y2 = double(7)
z2 = double(y2)
print_final_result('output.txt', add(z,z2))

Except for the use of TaskGenerator, the preceding code could have been a standard Python file. However, using TaskGenerator, it actually creates a series of tasks, and it is now possible to run it in a way that takes advantage of multiple processors. Behind the scenes, the decorator transforms your functions so that they do not actually execute but create a task. We also take advantage of the fact that we can pass tasks to other tasks and this results in a dependency being generated.

You may have noticed that we added a few sleep(4) calls in the preceding code. This simulates running a long computation. Otherwise, this code is so fast that there is no point in using multiple processors.

We start by running jug status:

About tasks

Now we start two processes simultaneously (in the background):

jug execute &
jug execute &

Now we run jug status again:

About tasks

We can see that the two initial double operators are running at the same time. After about 8 seconds, the whole process will finish and the output.txt file will be written.

By the way, if your file was called anything other than jugfile.py, you would then have to specify it explicitly on the command line:

jug execute MYFILE.py 

This is the only disadvantage of not using the name jugfile.py by the way.

Reusing partial results

For example, let's say you want to add a new feature (or even a set of features). As we saw in Chapter 10, Computer Vision–Pattern Recognition Finding Related Posts, this can easily be done by changing the computation code feature. However, this would imply recomputing all the features again, which is wasteful, particularly if you want to test new features and techniques quickly:

@TaskGenerator
def new_features(im):
    import mahotas as mh
    im = mh.imread(fname, as_grey=1)
    es = mh.sobel(im, just_filter=1)
    return np.array([np.dot(es.ravel(), es.ravel())])

hfeatures = as_array([hfeature(f) for f in filenames])
efeatures = as_array([new_feature(f) for f in filenames])
features = Task(np.hstack, [hfeatures, efeatures])
 # learning code...

Now when you run jug execute again, the new features will be computed, but the old features will be loaded from the cache. The logistic regression code will also be run again as those results also depend on the features and those are different now.

This is when jug can be very powerful; it ensures that you always get the results you want without wasteful overcomputation.

Looking under the hood

How does jug work? At the basic level, it is very simple; a task is a function plus its argument. Its arguments may be either values or other tasks. If a task takes other tasks, there is dependency between the two tasks (and the second one cannot be run until the results of the first task are available).

Based on this, jug recursively computes a hash for each task. This hash value encodes the whole computation to get there. When you run jug execute, there is a little loop as shown in the following code snippet:

for t in alltasks:
    if t.has_not_run() and not backend_has_value(t.hash()):
        value = t.execute()
        save_to_backend(value, key=t.hash())

The real loop is much more complex because of locking issues, but the general idea is the one that appears in the preceding code snippet.

The default backend writes the file to the disk (in this funny directory named jugfile.jugdata/) but another backend is available which uses a Redis database. With proper locking, which jug takes care of, this also allows for many processors to execute tasks; they will independently look at all the tasks and run the ones which have not run yet and then write them back to the shared backend. This works on either the same machine (using multicore processors) or in multiple machines as long as they all have access to the same backend (for example, using a network disk or the Redis databases). In the next half of this chapter, we will discuss computer clusters, but for now, let us focus on multiple cores.

You can also understand why it is able to memoize intermediate results. If the backend already has the result of a task, it is not run anymore. On the other hand, if you change the task, even in minute ways (by altering one of the parameters), its hash will change. Therefore, it will be recomputed. Furthermore, all tasks that depend on it will also have their hashes changed and they will be recomputed as well.

Using jug for data analysis

Jug is a generic framework, but it is ideally suited for medium-scale data analysis. As you develop your analysis pipeline, it is good to have intermediate results be saved. If you already computed the preprocessing step before and are only changing the features you compute, you do not want to recompute the preprocessing. If you already computed the features but want to try combining a few new ones into the mix, you also do not want to recompute all your other features.

Jug is also specially optimized to work with numpy arrays. So, whenever your tasks return or receive numpy arrays, you are taking advantage of this optimization. Jug is another piece of this ecosystem where everything works together.

We will now look back at Chapter 10, Computer Vision–Pattern Recognition Finding Related Posts. We learned how to compute features on images. Remember that we were loading image files, computing features, combining these, normalizing them, and finally learning how to create a classifier. We are going to redo that exercise but this time with the use of jug. The advantage of this version is that it is now possible to add a new feature without having to recompute all of the previous versions.

We start with a few imports as follows:

from jug import TaskGenerator

Now we define the first task generator, the feature computation:

@TaskGenerator
def hfeatures(fname):
    import mahotas as mh
    import numpy as np
    im = mh.imread(fname, as_grey=1)
    im = mh.stretch(im)
    h = mh.features.haralick(im)
    return np.hstack([h.ptp(0), h.mean(0)])

Note how we only imported numpy and mahotas inside the function. This is a small optimization; this way, only if the task is run are the modules loaded. Now we set up the image filenames as follows:

filenames = glob('dataset/*.jpg')

We can use TaskGenerator on any function, even on the ones that we did not write, such as numpy.array:

import numpy as np
as_array = TaskGenerator(np.array)

# compute all features:
features = as_array([hfeature(f) for f in filenames])

# get labels as an array as well
labels = map(label_for, f)
res = perform_cross_validation(features, labels)

@TaskGenerator
def write_result(ofname, value):
    with open(ofname, 'w') as out:
        print >>out, "Result is:", value
write_result('output.txt', res)

One small inconvenience of using jug is that we must always write functions to output the results to files as shown in the preceding examples. This is a small price to pay for the extra convenience of using jug.

Note

Not all features of jug could be mentioned in this chapter, but here is a summary of the most potentially interesting ones that we didn't cover in the main text:

  • jug invalidate: This feature declares that all results from a given function should be considered invalid and in need of recomputation. This will also recompute any downstream computation that depended (even indirectly) on the invalidated results.
  • jug status --cache: If jug status takes too long, you can use the --cache flag to cache the status and speed it up. Note that this will not detect any changes to jugfile.py, but you can always use --cache --clear to remove the cache and start again.
  • jug cleanup: This feature removes any extra files in the memoization cache. This is a garbage collection operation.

There are other more advanced features, which allow you to look at values that have been computed inside jugfile.py. Read up on the use of "barriers" in the jug documentation (online at http://jug.rtfd.org).

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.165.70