You can run this notebook in a live session or view it on Github.
Introduction to Dask¶
In this lesson, we discuss cover the basics of Dask. Our learning goals are as follows. By the end of the lesson, we will be able to:
Identify and describe Dask Collections (Array, DataFrame) and Schedulers
Work with Dask Array’s in much the same way you would work with a NumPy array
Understand some of the tradeoffs surounding chunk size, chunk shape, and computational overhead
Deploy a local Dask Distributed Cluster and access the diagnostics dashboard
Table of contents¶
What is Dask?¶
Dask is a flexible parallel computing library for analytic computing. Dask provides dynamic parallel task scheduling and highlevel bigdata collections like dask.array
and dask.dataframe
, and an extensive suite of deployment options. Dask’s documentation can be found here: https://docs.dask.org/en/latest/
Quick setup¶
For the purposes of this notebook, we’ll use a Dask Cluster to manage computations. The next cell sets up a simple LocalCluster. We’ll cover Dask schedulers and clusters later on in this notebook.
[1]:
from dask.distributed import Client
client = Client()
client
[1]:
Client

Cluster

👆
Click the Dashboard link above.
Dask Collections¶
Dask includes 3 main collections:
Dask Array: Parallel NumPy arrays
Dask DataFrame: Parallel Pandas DataFrames
Dask Bag: Parallel Python Lists
Xarray primarily interfaces with the Dask Array collection so we’ll skip the others for now. You can find out more about Dask’s user interfaces here.
Dask Arrays¶
Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using multiple cores. We coordinate these blocked algorithms using Dask graphs. Dask Array’s are also lazy, meaning that they do not evaluate until you explicitly ask for a result using the compute
method.
If we want to create a NumPy array of all ones, we do it like this:
[2]:
import numpy as np
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np
[2]:
array([[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
...,
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.]])
This array contains exactly 32 MB of data:
[3]:
print("%.1f MB" % (ones_np.nbytes / 1e6))
32.0 MB
Now let’s create the same array using Dask’s array interface.
[4]:
import dask.array as da
ones = da.ones(shape)
ones
[4]:

This works, but we didn’t tell Dask how to split up (or chunk) the array, so it is not optimized for parallel computation.
A crucal difference with Dask is that we must specify the chunks
argument. “Chunks” describes how the array is split up over many subarrays.
source:Dask Array Documentation
There are several ways to specify chunks. In this lecture, we will use a block shape.
[5]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones
[5]:

Notice that we just see a symbolic represetnation of the array, including its shape, dtype, and chunksize. No data has been generated yet. When we call .compute()
on a Dask array, the computation is trigger and the dask array becomes a numpy array.
[6]:
ones.compute()
[6]:
array([[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
...,
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.],
[1., 1., 1., ..., 1., 1., 1.]])
In order to understand what happened when we called .compute()
, we can visualize the Dask graph, the symbolic operations that make up the array
[7]:
ones.visualize()
[7]:
Our array has four chunks. To generate it, Dask calls np.ones
four times and then concatenates this together into one array.
Rather than immediately loading a Dask array (which puts all the data into RAM), it is more common to reduce the data somehow. For example:
[8]:
sum_of_ones = ones.sum()
sum_of_ones.visualize()
[8]:
Exercise¶
Modify the chunk size (or shape) in the ones
array and visualize how the task graph changes.
[9]:
# your code here
Here we see Dask’s strategy for finding the sum. This simple example illustrates the beauty of Dask: it automatically designs an algorithm appropriate for custom operations with big data.
If we make our operation more complex, the graph gets more complex.
[10]:
fancy_calculation = (ones * ones[::1, ::1]).mean()
fancy_calculation.visualize()
[10]:
A Bigger Calculation¶
The examples above were toy examples; the data (32 MB) is probably not big enough to warrant the use of Dask.
We can make it a lot bigger!
[11]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones
[11]:

[12]:
print("%.1f MB" % (big_ones.nbytes / 1e6))
6400.0 MB
This dataset is 6.4 GB, rather than 32 MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, Dask has no problem working on it.
Do not try to ``.visualize()`` this array!
When doing a big calculation, dask also has some tools to help us understand what is happening under the hood. Let’s watch the dashboard again as we do a bigger computation.
[13]:
big_calc = (big_ones * big_ones[::1, ::1]).mean()
result = big_calc.compute()
result
[13]:
1.0
Reduction¶
All the usual numpy methods work on dask arrays. You can also apply numpy function directly to a dask array, and it will stay lazy.
[14]:
big_ones_reduce = (np.cos(big_ones) ** 2).mean(axis=1)
big_ones_reduce
[14]:

Plotting also triggers computation, since we need the actual values
[15]:
%matplotlib inline
from matplotlib import pyplot as plt
[16]:
plt.plot(big_ones_reduce)
[16]:
[<matplotlib.lines.Line2D at 0x7fb01a298910>]
Parallelism using the dask.distributed scheduler¶
In the first cell of this notebook, we started a local Dask Cluster and Client. We skipped past some important details there that we’ll unpack now.
Dask Schedulers¶
The Dask Schedulers orchestrate the tasks in the Task Graphs so that they can be run in parallel. How they run in parallel, though, is determined by which Scheduler you choose.
There are 3 local schedulers:
SingleThread Local: For debugging, profiling, and diagnosing issues
Multithreaded: Using the Python builtin
threading
package (the default for all Dask operations exceptBags
)Multiprocess: Using the Python builtin
multiprocessing
package (the default for DaskBags
)
and 1 distributed scheduler, which we will talk about later:
Distributed: Using the
dask.distributed
module (which usestornado
for communication over TCP). The distributed scheduler uses aCluster
to manage communication between the scheduler and the “workers”. This is described in the next section.
Distributed Clusters (http://distributed.dask.org/)¶
LocalCluster
 Creates aCluster
that can be executed locally. EachCluster
includes aScheduler
andWorker
s.Client
 Connects to and drives computation on a distributedCluster
Profiling & Diagnostics using the Dask Dashboard¶
You’ll recall from above, that we opened a url to the Dask Dashboard:
[17]:
client
[17]:
Client

Cluster

The dashboard the Dask distributed scheduler provides a an incredibly valuable tool for gaining insights into the performance of your computation and the cluster as a whole. In the dashboard, you’ll see a number of tags:
Status: Overview of the current state of the scheduler, including the active task stream, progress, memory per worker, and the number of tasks per worker.
Workers: The workers tab allows you to track cpu and memory use per worker.
System: Live tracking of system resources like cpu, memory, bandwidth, and open file descriptors
Profile: Finegrained statistical profiling
Info: Worker status and logs.
Another useful diagnostic tool is Dask’s static performance report. This allows you to save a report, including the task stream, worker profiles, etc. for all or a specific part of a workflow. Below is an example of how you would create such a report:
[18]:
from dask.distributed import performance_report
with performance_report(filename="daskreport.html"):
big_calc.compute()
Exercise¶
Again, let’s modify the chunk size in big_ones
(aim for ~100mb). How does the Performance Report change with a larger chunk size?
[19]:
# your code here
with performance_report(filename="daskreportlargechunk.html"):
big_calc.compute()
Distributed Dask clusters for HPC and Cloud environments¶
Dask can be deployed on distributed infrastructure, such as a an HPC system or a cloud computing system. There is a growing ecosystem of Dask deployment projects that faciliate easy deployment and scaling of Dask clusters on a wide variety of computing systems.
HPC¶
Dask Jobqueue (https://jobqueue.dask.org/)¶
dask_jobqueue.PBSCluster
dask_jobqueue.SlurmCluster
dask_jobqueue.LSFCluster
etc.
Dask MPI (https://mpi.dask.org/)¶
dask_mpi.initialize
Cloud¶
Dask Kubernetes (https://kubernetes.dask.org/)¶
dask_kubernetes.KubeCluster
Dask Cloud Provider (https://cloudprovider.dask.org)¶
dask_cloudprovider.FargateCluster
dask_cloudprovider.ECSCluster
dask_cloudprovider.ECSCluster
Dask Gateway (https://gateway.dask.org/)¶
dask_gateway.GatewayCluster
Note: Pieces of this notebook comes from the following sources: