Dask distributed

The first iterations of the Dask project were designed to run on a single computer using a thread-based or a process-based scheduler. Recently, the implementation of a new distributed backend can be used to set up and run Dask graphs on a network of computers.

Dask distributed is not installed automatically with Dask. The library is available through the conda package manager (use the $ conda install distributed command ) as well as pip (with the $ pip install distributed command).

Getting started with Dask distributed is really easy. The most basic setup is obtained by instantiating a Client object:

    from dask.distributed import Client

client = Client()
# Result:
# <Client: scheduler='tcp://127.0.0.1:46472' processes=4 cores=4>

By default, Dask will start a few key processes (on the local machine) necessary for scheduling and executing distributed tasks through the Client instance. The main components of a Dask cluster are a single scheduler and a collection of workers.

The scheduler is the process responsible for distributing the work across the workers and to monitor and manage the results. Generally, when a task is submitted to the user, the scheduler will find a free worker and submit a task for execution. Once the worker is done, the scheduler is informed that the result is available.

A worker is a process that accepts incoming tasks and produces results. Workers can reside on different machines over the network. Workers execute tasks using ThreadPoolExecutor. This can be used to achieve parallelism when using functions that do not acquire the GIL (such as Numpy, Pandas, and Cython functions in nogil blocks). When executing pure Python code, it is advantageous to start many single-threaded worker processes as this will enable parallelism for code that acquires the GIL.

The Client class can be used to submit tasks manually to the scheduler using familiar asynchronous methods. For example, to submit a function for execution on the cluster, one can use the Client.map and Client.submit methods. In the following code, we demonstrate the use of Client.map and Client.submit to calculate the square of a few numbers. The Client will submit a series of tasks to the scheduler and we will receive a Future instance for each task:

    def square(x):
return x ** 2

fut = client.submit(square, 2)
# Result:
# <Future: status: pending, key: square-05236e00d545104559e0cd20f94cd8ab>

client.map(square)
futs = client.map(square, [0, 1, 2, 3, 4])
# Result:
# [<Future: status: pending, key: square-d043f00c1427622a694f518348870a2f>,
# <Future: status: pending, key: square-9352eac1fb1f6659e8442ca4838b6f8d>,
# <Future: status: finished, type: int, key:
# square-05236e00d545104559e0cd20f94cd8ab>,
# <Future: status: pending, key:
# square-c89f4c21ae6004ce0fe5206f1a8d619d>,
# <Future: status: pending, key:
# square-a66f1c13e2a46762b092a4f2922e9db9>]

So far, this is quite similar to what we saw in the earlier chapters with TheadPoolExecutor and ProcessPoolExecutor. Note however, that Dask Distributed not only submits the tasks, but also caches the computation results on the worker memory. You can see caching in action by looking at the preceding code example. When we first invoke client.submit, the square(2) task is created and its status is set to pending. When we subsequently invoke client.map,  the square(2) task is resubmitted to the scheduler, but this time, rather than recalculating its value, the scheduler directly retrieves the result for the worker. As a result, the third Future returned by map already has a finished status.

Results from a collection of Future instances can be retrieved using the Client.gather method:

    client.gather(futs)
# Result:
# [0, 1, 4, 9, 16]

Client can also be used to run arbitrary Dask graphs. For example, we can trivially run our approximation of pi by passing the client.get function as an optional argument to pi.compute:

    pi.compute(get=client.get)

This feature makes Dask extremely scalable as it is possible to develop and run algorithms on the local machine using one of the simpler schedulers and, in case the performance is not satisfactory, to reuse the same algorithms on a cluster of hundreds of machines.  

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.12.162.37