You can run this notebook in a live session Binder or view it on Github.

52a02f07469041bb8636a80e70b7b07e

Introduction to Dask

In this lesson, we discuss cover the basics of Dask. Our learning goals are as follows. By the end of the lesson, we will be able to:

  • Identify and describe Dask Collections (Array, DataFrame) and Schedulers

  • Work with Dask Array’s in much the same way you would work with a NumPy array

  • Understand some of the tradeoffs surounding chunk size, chunk shape, and computational overhead

  • Deploy a local Dask Distributed Cluster and access the diagnostics dashboard

What is Dask?

Dask is a flexible parallel computing library for analytic computing. Dask provides dynamic parallel task scheduling and high-level big-data collections like dask.array and dask.dataframe, and an extensive suite of deployment options. Dask’s documentation can be found here: https://docs.dask.org/en/latest/

Dask overview

Quick setup

For the purposes of this notebook, we’ll use a Dask Cluster to manage computations. The next cell sets up a simple LocalCluster. We’ll cover Dask schedulers and clusters later on in this notebook.

[1]:
from dask.distributed import Client

client = Client()
client
[1]:

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 8.36 GB

&#128070

Click the Dashboard link above.

Dask Collections

Dask includes 3 main collections:

Xarray primarily interfaces with the Dask Array collection so we’ll skip the others for now. You can find out more about Dask’s user interfaces here.

Dask Arrays

Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using multiple cores. We coordinate these blocked algorithms using Dask graphs. Dask Array’s are also lazy, meaning that they do not evaluate until you explicitly ask for a result using the compute method.

If we want to create a NumPy array of all ones, we do it like this:

[2]:
import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np
[2]:
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

This array contains exactly 32 MB of data:

[3]:
print("%.1f MB" % (ones_np.nbytes / 1e6))
32.0 MB

Now let’s create the same array using Dask’s array interface.

[4]:
import dask.array as da

ones = da.ones(shape)
ones
[4]:
Array Chunk
Bytes 32.00 MB 32.00 MB
Shape (1000, 4000) (1000, 4000)
Count 1 Tasks 1 Chunks
Type float64 numpy.ndarray
4000 1000

This works, but we didn’t tell Dask how to split up (or chunk) the array, so it is not optimized for parallel computation.

A crucal difference with Dask is that we must specify the chunks argument. “Chunks” describes how the array is split up over many sub-arrays.

Dask Arrays source:Dask Array Documentation

There are several ways to specify chunks. In this lecture, we will use a block shape.

[5]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones
[5]:
Array Chunk
Bytes 32.00 MB 8.00 MB
Shape (1000, 4000) (1000, 1000)
Count 4 Tasks 4 Chunks
Type float64 numpy.ndarray
4000 1000

Notice that we just see a symbolic represetnation of the array, including its shape, dtype, and chunksize. No data has been generated yet. When we call .compute() on a Dask array, the computation is trigger and the dask array becomes a numpy array.

[6]:
ones.compute()
[6]:
array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

In order to understand what happened when we called .compute(), we can visualize the Dask graph, the symbolic operations that make up the array

[7]:
ones.visualize()
[7]:
../_images/scipy-tutorial_05_intro_to_dask_15_0.png

Our array has four chunks. To generate it, Dask calls np.ones four times and then concatenates this together into one array.

Rather than immediately loading a Dask array (which puts all the data into RAM), it is more common to reduce the data somehow. For example:

[8]:
sum_of_ones = ones.sum()
sum_of_ones.visualize()
[8]:
../_images/scipy-tutorial_05_intro_to_dask_17_0.png

Exercise

Modify the chunk size (or shape) in the ones array and visualize how the task graph changes.

[9]:
# your code here

Here we see Dask’s strategy for finding the sum. This simple example illustrates the beauty of Dask: it automatically designs an algorithm appropriate for custom operations with big data.

If we make our operation more complex, the graph gets more complex.

[10]:
fancy_calculation = (ones * ones[::-1, ::-1]).mean()
fancy_calculation.visualize()
[10]:
../_images/scipy-tutorial_05_intro_to_dask_21_0.png

A Bigger Calculation

The examples above were toy examples; the data (32 MB) is probably not big enough to warrant the use of Dask.

We can make it a lot bigger!

[11]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones
[11]:
Array Chunk
Bytes 6.40 GB 8.00 MB
Shape (200000, 4000) (1000, 1000)
Count 800 Tasks 800 Chunks
Type float64 numpy.ndarray
4000 200000
[12]:
print("%.1f MB" % (big_ones.nbytes / 1e6))
6400.0 MB

This dataset is 6.4 GB, rather than 32 MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, Dask has no problem working on it.

Do not try to ``.visualize()`` this array!

When doing a big calculation, dask also has some tools to help us understand what is happening under the hood. Let’s watch the dashboard again as we do a bigger computation.

[13]:
big_calc = (big_ones * big_ones[::-1, ::-1]).mean()

result = big_calc.compute()
result
[13]:
1.0

Reduction

All the usual numpy methods work on dask arrays. You can also apply numpy function directly to a dask array, and it will stay lazy.

[14]:
big_ones_reduce = (np.cos(big_ones) ** 2).mean(axis=1)
big_ones_reduce
[14]:
Array Chunk
Bytes 1.60 MB 8.00 kB
Shape (200000,) (1000,)
Count 3400 Tasks 200 Chunks
Type float64 numpy.ndarray
200000 1

Plotting also triggers computation, since we need the actual values

[15]:
%matplotlib inline
from matplotlib import pyplot as plt
[16]:
plt.plot(big_ones_reduce)
[16]:
[<matplotlib.lines.Line2D at 0x7fb01a298910>]
../_images/scipy-tutorial_05_intro_to_dask_31_1.png

Parallelism using the dask.distributed scheduler

In the first cell of this notebook, we started a local Dask Cluster and Client. We skipped past some important details there that we’ll unpack now.

Dask Schedulers

The Dask Schedulers orchestrate the tasks in the Task Graphs so that they can be run in parallel. How they run in parallel, though, is determined by which Scheduler you choose.

There are 3 local schedulers:

  • Single-Thread Local: For debugging, profiling, and diagnosing issues

  • Multi-threaded: Using the Python built-in threading package (the default for all Dask operations except Bags)

  • Multi-process: Using the Python built-in multiprocessing package (the default for Dask Bags)

and 1 distributed scheduler, which we will talk about later:

  • Distributed: Using the dask.distributed module (which uses tornado for communication over TCP). The distributed scheduler uses a Cluster to manage communication between the scheduler and the “workers”. This is described in the next section.

Distributed Clusters (http://distributed.dask.org/)

  • LocalCluster - Creates a Cluster that can be executed locally. Each Cluster includes a Scheduler and Workers.

  • Client - Connects to and drives computation on a distributed Cluster

Profiling & Diagnostics using the Dask Dashboard

You’ll recall from above, that we opened a url to the Dask Dashboard:

[17]:
client
[17]:

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 8.36 GB

The dashboard the Dask distributed scheduler provides a an incredibly valuable tool for gaining insights into the performance of your computation and the cluster as a whole. In the dashboard, you’ll see a number of tags:

  • Status: Overview of the current state of the scheduler, including the active task stream, progress, memory per worker, and the number of tasks per worker.

  • Workers: The workers tab allows you to track cpu and memory use per worker.

  • System: Live tracking of system resources like cpu, memory, bandwidth, and open file descriptors

  • Profile: Fine-grained statistical profiling

  • Info: Worker status and logs.

Another useful diagnostic tool is Dask’s static performance report. This allows you to save a report, including the task stream, worker profiles, etc. for all or a specific part of a workflow. Below is an example of how you would create such a report:

[18]:
from dask.distributed import performance_report

with performance_report(filename="dask-report.html"):
    big_calc.compute()

Exercise

Again, let’s modify the chunk size in big_ones (aim for ~100mb). How does the Performance Report change with a larger chunk size?

[19]:
# your code here

with performance_report(filename="dask-report-large-chunk.html"):
    big_calc.compute()

Distributed Dask clusters for HPC and Cloud environments

Dask can be deployed on distributed infrastructure, such as a an HPC system or a cloud computing system. There is a growing ecosystem of Dask deployment projects that faciliate easy deployment and scaling of Dask clusters on a wide variety of computing systems.

HPC

Dask Jobqueue (https://jobqueue.dask.org/)

  • dask_jobqueue.PBSCluster

  • dask_jobqueue.SlurmCluster

  • dask_jobqueue.LSFCluster

  • etc.

Dask MPI (https://mpi.dask.org/)

  • dask_mpi.initialize

Cloud

Dask Kubernetes (https://kubernetes.dask.org/)

  • dask_kubernetes.KubeCluster

Dask Cloud Provider (https://cloudprovider.dask.org)

  • dask_cloudprovider.FargateCluster

  • dask_cloudprovider.ECSCluster

  • dask_cloudprovider.ECSCluster

Dask Gateway (https://gateway.dask.org/)

  • dask_gateway.GatewayCluster


Note: Pieces of this notebook comes from the following sources: