Directed Acyclic Graphs

The idea behind Dask is quite similar to what we already saw in the last chapter with Theano and Tensorflow. We can use a familiar Pythonic API to build an execution plan, and the framework will automatically split the workflow into tasks that will be shipped and executed on multiple processes or computers.

Dask expresses its variables and operations as a Directed Acyclic Graph (DAG) that can be represented through a simple Python dictionary. To briefly illustrate how this works, we will implement the sum of two numbers with Dask. We will define our computational graph by storing the values of our input variables in the dictionary. The a and b input variables will be given a value of 2:

    dsk = {
"a" : 2,
"b" : 2,
}

Each variable represents a node in the DAG. The next step necessary to build our DAG is the execution of operations on the nodes we just defined. In Dask, a task can be defined by placing a tuple containing a Python function and its positional arguments in the dsk dictionary. To implement a sum, we can add a new node, named result, (the actual name is completely arbitrary) with a tuple containing the function we intend to execute, followed by its arguments. This is illustrated in the following code:

    dsk = {
"a" : 2,
"b" : 2,
"result": (lambda x, y: x + y, "a", "b")
}

For better style and clarity, we can calculate the sum by replacing the lambda statement with the standard operator.add library function:

    from operator import add
dsk = {
"a" : 2,
"b" : 2,
"result": (add, "a", "b")
}

It's important to note that the arguments we intend to pass to the function are the "a" and "b" strings, which refer to the a and b nodes in the graph. Note that we didn't use any Dask-specific functions to define the DAG; this is the first indication of how the framework is flexible and lean since all manipulations are performed on simple and familiar Python dictionaries.

The execution of tasks is performed by a scheduler, which is a function that takes a DAG and the task or tasks we'd like to perform and returns the computed value. The default Dask scheduler is the dask.get function, which can be used as follows:

    import dask

res = dask.get(dsk, "result")
print(res)
# Output:
# 4

All the complexity is hidden behind the scheduler, which will take care of distributing the tasks across threads, processes, or even different machines. The dask.get scheduler is a synchronous and serial implementation that is useful for testing and debugging purposes.

Defining graphs using a simple dictionary is useful to understand how Dask does its magic and for debugging purposes. Raw dictionaries can also be used to implement more complex algorithms not covered by the Dask API. Now, we will learn how Dask is capable of generating tasks automatically through a familiar NumPy- and Pandas-like interface.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.86.211