This work is supported by Continuum Analytics and the Data Driven Discovery Initiative from the Moore Foundation.

Summary#

We measure the performance of Dask’s distributed scheduler for a variety of different workloads under increasing scales of both problem and cluster size. This helps to answer questions about dask’s scalability and also helps to educate readers on the sorts of computations that scale well.

We will vary our computations in a few ways to see how they stress performance. We consider the following:

  1. Computational and communication patterns like embarrassingly parallel, fully sequential, bulk communication, many-small communication, nearest neighbor, tree reductions, and dynamic graphs.

  2. Varying task duration ranging from very fast (microsecond) tasks, to 100ms and 1s long tasks. Faster tasks make it harder for the central scheduler to keep up with the workers.

  3. Varying cluster size from one two-core worker to 256 two-core workers and varying dataset size which we scale linearly with the number of workers. This means that we’re measuring weak scaling.

  4. Varying APIs between tasks, multidimensional arrays and dataframes all of which have cases in the above categories but depend on different in-memory computational systems like NumPy or Pandas.

We will start with benchmarks for straight tasks, which are the most flexible system and also the easiest to understand. This will help us to understand scaling limits on arrays and dataframes.

Note: we did not tune our benchmarks or configuration at all for these experiments. They are well below what is possible, but perhaps representative of what a beginning user might experience upon setting up a cluster without expertise or thinking about configuration.

A Note on Benchmarks and Bias -----------------------------

you can safely skip this section if you’re in a rush

This is a technical document, not a marketing piece. These benchmarks adhere to the principles laid out in this blogpost and attempt to avoid those pitfalls around developer bias. In particular the following are true:

  1. We decided on a set of benchmarks before we ran them on a cluster

  2. We did not improve the software or tweak the benchmarks after seeing the results. These were run on the current release of Dask in the wild that was put out weeks ago, not on a development branch.

  3. The computations were constructed naively, as a novice would write them. They were not tweaked for extra performance.

  4. The cluster was configured naively, without attention to scale or special parameters

We estimate that expert use would result in about a 5-10x scaling improvement over what we’ll see. We’ll detail how to improve scaling with expert methods at the bottom of the post.

All that being said the author of this blogpost is paid to write this software and so you probably shouldn’t trust him. We invite readers to explore things independently. All configuration, notebooks, plotting code, and data are available below:


Tasks#

We start by benchmarking the task scheduling API. Dask’s task scheduling APIs are at the heart of the other “big data” APIs (like dataframes). We start with tasks because they’re the simplest and most raw representation of Dask. Mostly we’ll run the following functions on integers, but you could fill in any function here, like a pandas dataframe method or sklearn routine.

import time

def inc(x):
    return x + 1

def add(x, y):
    return x + y

def slowinc(x, delay=0.1):
    time.sleep(delay)
    return x + 1

def slowadd(x, y, delay=0.1):
    time.sleep(delay)
    return x + y

def slowsum(L, delay=0.1):
    time.sleep(delay)
    return sum(L)

Embarrassingly Parallel Tasks#

We run the following code on our cluster and measure how long they take to complete:

futures = client.map(slowinc, range(4 * n), delay=1) # 1s delay
wait(futures)
futures = client.map(slowinc, range(100 * n_cores)) # 100ms delay
wait(futures)
futures = client.map(inc, range(n_cores * 200))     # fast
wait(futures)

We see that for fast tasks the system can process around 2000-3000 tasks per second. This is mostly bound by scheduler and client overhead. Adding more workers into the system doesn’t give us any more tasks per second. However if our tasks take any amount of time (like 100ms or 1s) then we see decent speedups.

If you switch to linear scales on the plots, you’ll see that as we get out to 512 cores we start to slow down by about a factor of two. I’m surprised to see this behavior (hooray benchmarks) because all of Dask’s scheduling decisions are independent of cluster size. My first guess is that the scheduler may be being swamped with administrative messages, but we’ll have to dig in a bit deeper here.

Tree Reduction#

Not all computations are embarrassingly parallel. Many computations have dependencies between them. Consider a tree reduction, where we combine neighboring elements until there is only one left. This stresses task dependencies and small data movement.

from dask import delayed

L = range(2**7 * n)
while len(L) > 1:  # while there is more than one element left
    # add neighbors together
    L = [delayed(slowadd)(a, b) for a, b in zip(L[::2], L[1::2])]

L[0].compute()

We see similar scaling to the embarrassingly parallel case. Things proceed linearly until they get to around 3000 tasks per second, at which point they fall behind linear scaling. Dask doesn’t seem to mind dependencies, even custom situations like this one.

Nearest Neighbor#

Nearest neighbor computations are common in data analysis when you need to share a bit of data between neighboring elements, such as frequently occurs in timeseries computations in dataframes or overlapping image processing in arrays or PDE computations.

L = range(20 * n)
L = client.map(slowadd, L[:-1], L[1:])
L = client.map(slowadd, L[:-1], L[1:])
wait(L)

Scaling is similar to the tree reduction case. Interesting dependency structures don’t incur significant overhead or scaling costs.

Sequential#

We consider a computation that isn’t parallel at all, but is instead highly sequential. Increasing the number of workers shouldn’t help here (there is only one thing to do at a time) but this does demonstrate the extra stresses that arise from a large number of workers. Note that we have turned off task fusion for this, so here we’re measuring how many roundtrips can occur between the scheduler and worker every second.

x = 1

for i in range(100):
    x = delayed(inc)(x)

x.compute()

So we get something like 100 roundtrips per second, or around 10ms roundtrip latencies. It turns out that a decent chunk of this cost was due to an optimization; workers prefer to batch small messages for higher throughput. In this case that optimization hurts us. Still though, we’re about 2-4x faster than video frame-rate here (video runs at around 24Hz or 40ms between frames).

Client in the loop#

Finally we consider a reduction that consumes whichever futures finish first and adds them together. This is an example of using client-side logic within the computation, which is often helpful in complex algorithms. This also scales a little bit better because there are fewer dependencies to track within the scheduler. The client takes on a bit of the load.

from dask.distributed import as_completed
futures = client.map(slowinc, range(n * 20))

pool = as_completed(futures)
batches = pool.batches()

while True:
    try:
        batch = next(batches)
        if len(batch) == 1:
            batch += next(batches)
    except StopIteration:
        break
    future = client.submit(slowsum, batch)
    pool.add(future)

Tasks: Complete#

We show most of the plots from above for comparison.

Arrays#

When we combine NumPy arrays with the task scheduling system above we get dask.array, a distributed multi-dimensional array. This section shows computations like the last section (maps, reductions, nearest-neighbor), but now these computations are motivated by actual data-oriented computations and involve real data movement.

### Create Dataset

We make a square array with somewhat random data. This array scales with the number of cores. We cut it into uniform chunks of size 2000 by 2000.

N = int(5000 * math.sqrt(n_cores))
x = da.random.randint(0, 10000, size=(N, N), chunks=(2000, 2000))
x = x.persist()
wait(x)

Creating this array is embarrassingly parallel. There is an odd corner in the graph here that I’m not able to explain.

Elementwise Computation#

We perform some numerical computation element-by-element on this array.

y = da.sin(x) ** 2 + da.cos(x) ** 2
y = y.persist()
wait(y)

This is also embarrassingly parallel. Each task here takes around 300ms (the time it takes to call this on a single 2000 by 2000 numpy array chunk).

Reductions#

We sum the array. This is implemented as a tree reduction.

x.std().compute()