You can run this notebook in a live session or view it on Github. In this lesson, we discuss cover the basics of Dask. Our learning goals are as follows. By the end of the lesson, we will be able to:

• Identify and describe Dask Collections (Array, DataFrame) and Schedulers

• Work with Dask Array’s in much the same way you would work with a NumPy array

• Understand some of the tradeoffs surounding chunk size, chunk shape, and computational overhead

• Deploy a local Dask Distributed Cluster and access the diagnostics dashboard Dask is a flexible parallel computing library for analytic computing. Dask provides dynamic parallel task scheduling and high-level big-data collections like dask.array and dask.dataframe, and an extensive suite of deployment options. Dask’s documentation can be found here: https://docs.dask.org/en/latest/ ## Quick setup¶

For the purposes of this notebook, we’ll use a Dask Cluster to manage computations. The next cell sets up a simple LocalCluster. We’ll cover Dask schedulers and clusters later on in this notebook.

:

from dask.distributed import Client

client = Client()
client

:


### Cluster

• Workers: 2
• Cores: 2
• Memory: 8.36 GB

&#128070

Xarray primarily interfaces with the Dask Array collection so we’ll skip the others for now. You can find out more about Dask’s user interfaces here.

Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using multiple cores. We coordinate these blocked algorithms using Dask graphs. Dask Array’s are also lazy, meaning that they do not evaluate until you explicitly ask for a result using the compute method.

If we want to create a NumPy array of all ones, we do it like this:

:

import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

:

array([[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
...,
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.]])


This array contains exactly 32 MB of data:

:

print("%.1f MB" % (ones_np.nbytes / 1e6))

32.0 MB


Now let’s create the same array using Dask’s array interface.

:

import dask.array as da

ones = da.ones(shape)
ones

:

 Array Chunk 32.00 MB 32.00 MB (1000, 4000) (1000, 4000) 1 Tasks 1 Chunks float64 numpy.ndarray

This works, but we didn’t tell Dask how to split up (or chunk) the array, so it is not optimized for parallel computation.

A crucal difference with Dask is that we must specify the chunks argument. “Chunks” describes how the array is split up over many sub-arrays. source:Dask Array Documentation

There are several ways to specify chunks. In this lecture, we will use a block shape.

:

chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

:

 Array Chunk 32.00 MB 8.00 MB (1000, 4000) (1000, 1000) 4 Tasks 4 Chunks float64 numpy.ndarray

Notice that we just see a symbolic represetnation of the array, including its shape, dtype, and chunksize. No data has been generated yet. When we call .compute() on a Dask array, the computation is trigger and the dask array becomes a numpy array.

:

ones.compute()

:

array([[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
...,
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.]])


In order to understand what happened when we called .compute(), we can visualize the Dask graph, the symbolic operations that make up the array

:

ones.visualize()

: Our array has four chunks. To generate it, Dask calls np.ones four times and then concatenates this together into one array.

Rather than immediately loading a Dask array (which puts all the data into RAM), it is more common to reduce the data somehow. For example:

:

sum_of_ones = ones.sum()
sum_of_ones.visualize()

: ### Exercise¶

Modify the chunk size (or shape) in the ones array and visualize how the task graph changes.

:

# your code here


Here we see Dask’s strategy for finding the sum. This simple example illustrates the beauty of Dask: it automatically designs an algorithm appropriate for custom operations with big data.

If we make our operation more complex, the graph gets more complex.

:

fancy_calculation = (ones * ones[::-1, ::-1]).mean()
fancy_calculation.visualize()

: ### A Bigger Calculation¶

The examples above were toy examples; the data (32 MB) is probably not big enough to warrant the use of Dask.

We can make it a lot bigger!

:

bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones

:

 Array Chunk 6.40 GB 8.00 MB (200000, 4000) (1000, 1000) 800 Tasks 800 Chunks float64 numpy.ndarray
:

print("%.1f MB" % (big_ones.nbytes / 1e6))

6400.0 MB


This dataset is 6.4 GB, rather than 32 MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, Dask has no problem working on it.

Do not try to .visualize() this array!

When doing a big calculation, dask also has some tools to help us understand what is happening under the hood. Let’s watch the dashboard again as we do a bigger computation.

:

big_calc = (big_ones * big_ones[::-1, ::-1]).mean()

result = big_calc.compute()
result

:

1.0


### Reduction¶

All the usual numpy methods work on dask arrays. You can also apply numpy function directly to a dask array, and it will stay lazy.

:

big_ones_reduce = (np.cos(big_ones) ** 2).mean(axis=1)
big_ones_reduce

:

 Array Chunk 1.60 MB 8.00 kB (200000,) (1000,) 3400 Tasks 200 Chunks float64 numpy.ndarray

Plotting also triggers computation, since we need the actual values

:

%matplotlib inline
from matplotlib import pyplot as plt

:

plt.plot(big_ones_reduce)

:

[<matplotlib.lines.Line2D at 0x7fb01a298910>] ## Parallelism using the dask.distributed scheduler¶

In the first cell of this notebook, we started a local Dask Cluster and Client. We skipped past some important details there that we’ll unpack now.

The Dask Schedulers orchestrate the tasks in the Task Graphs so that they can be run in parallel. How they run in parallel, though, is determined by which Scheduler you choose.

There are 3 local schedulers:

• Single-Thread Local: For debugging, profiling, and diagnosing issues

• Multi-threaded: Using the Python built-in threading package (the default for all Dask operations except Bags)

• Multi-process: Using the Python built-in multiprocessing package (the default for Dask Bags)

and 1 distributed scheduler, which we will talk about later:

• Distributed: Using the dask.distributed module (which uses tornado for communication over TCP). The distributed scheduler uses a Cluster to manage communication between the scheduler and the “workers”. This is described in the next section.

• LocalCluster - Creates a Cluster that can be executed locally. Each Cluster includes a Scheduler and Workers.

• Client - Connects to and drives computation on a distributed Cluster

## Profiling & Diagnostics using the Dask Dashboard¶

You’ll recall from above, that we opened a url to the Dask Dashboard:

:

client

:


### Cluster

• Workers: 2
• Cores: 2
• Memory: 8.36 GB

The dashboard the Dask distributed scheduler provides a an incredibly valuable tool for gaining insights into the performance of your computation and the cluster as a whole. In the dashboard, you’ll see a number of tags:

• Status: Overview of the current state of the scheduler, including the active task stream, progress, memory per worker, and the number of tasks per worker.

• Workers: The workers tab allows you to track cpu and memory use per worker.

• System: Live tracking of system resources like cpu, memory, bandwidth, and open file descriptors

• Profile: Fine-grained statistical profiling

• Info: Worker status and logs.

Another useful diagnostic tool is Dask’s static performance report. This allows you to save a report, including the task stream, worker profiles, etc. for all or a specific part of a workflow. Below is an example of how you would create such a report:

:

from dask.distributed import performance_report

big_calc.compute()


### Exercise¶

Again, let’s modify the chunk size in big_ones (aim for ~100mb). How does the Performance Report change with a larger chunk size?

:

# your code here

big_calc.compute()


## Distributed Dask clusters for HPC and Cloud environments¶

Dask can be deployed on distributed infrastructure, such as a an HPC system or a cloud computing system. There is a growing ecosystem of Dask deployment projects that faciliate easy deployment and scaling of Dask clusters on a wide variety of computing systems.

### HPC¶

• dask_jobqueue.PBSCluster

• dask_jobqueue.SlurmCluster

• dask_jobqueue.LSFCluster

• etc.

• dask_mpi.initialize

### Cloud¶

• dask_kubernetes.KubeCluster

• dask_cloudprovider.FargateCluster
• dask_cloudprovider.ECSCluster
• dask_cloudprovider.ECSCluster
• dask_gateway.GatewayCluster