---
blogpost: true
date: Jan 30, 2017
title: Dask Development Log
tags: Programming, Python, scipy
---

_This work is supported by [Continuum Analytics](http://continuum.io)
the [XDATA Program](http://www.darpa.mil/program/XDATA)
and the Data Driven Discovery Initiative from the [Moore
Foundation](https://www.moore.org/)_

To increase transparency I'm blogging weekly about the work done on Dask and
related projects during the previous week. This log covers work done between
2017-01-17 and 2016-01-30. Nothing here is ready for production. This
blogpost is written in haste, so refined polish should not be expected.

Themes of the last couple of weeks:

1.  Micro-release of distributed scheduler
2.  as_completed for asynchronous algorithms
3.  Testing ZeroMQ and communication refactor
4.  Persist, and Dask-GLM

### Stability enhancements and micro-release

We've released dask.distributed version 1.15.2, which includes some important
performance improvements for communicating multi-dimensional arrays, cleaner
scheduler shutdown of workers for adaptive deployments, an improved
as_completed iterator that can accept new futures in flight, and a few other
smaller features.

The feature here that excites me the most is improved communication of
multi-dimensional arrays across the network. In a
[recent blogpost about image processing on a cluster](/2017/01/17/dask-images)
we noticed that communication bandwidth was far lower than expected. This led
us to uncover a flaw in our compression heuristics. Dask doesn't compress all
data, instead it takes a few 10kB samples of the data, compresses them, and if
that goes well, decides to compress the entire thing. Unfortunately, due to
our mishandling of memoryviews we ended up taking _much_ larger samples than
1kB when dealing with multi-dimensional arrays.

### XArray release

This improvement is particularly timely because a new release of
[XArray](http://xarray.pydata.org/en/stable/) (a project that wraps around
Dask.array for large labeled arrays) is now available with better data
ingestion support for NetCDF data on distributed clusters. This opens up
distributed array computing to Dask's first (and possibly largest) scientific
user community, atmospheric scientists.

### as_completed accepts futures.

In addition to arrays, dataframes, and the delayed decorator, Dask.distributed
also implements the [concurrent.futures](http://xarray.pydata.org/en/stable/)
interface from the standard library (except that Dask's version parallelizes
across a cluster and has a few other benefits). Part of this interface is the
[as_completed](http://xarray.pydata.org/en/stable/) function, which takes in a
list of futures and yields those futures in the order in which they finish.
This enables the construction of fairly responsive asynchronous computations.
As soon as some work finishes you can look at the result and submit more work
based on the current state.

That has been around in Dask for some time.

What's _new_ is that you can now push more futures into as_completed

```python
futures = client.map(my_function, sequence)

ac = as_completed(futures)
for future in ac:
    result = future.result()  # future is already finished, so this is fast
    if condition:
        new_future = client.submit(function, *args, **kwargs)
        ac.add(new_future)    # <<---- This is the new ability
```

So the `as_completed` iterator can keep going for quite a while with a set of
futures always in flight. This relatively simple change allows for the easy
expression of a broad set of asynchronous algorithms.

### ZeroMQ and Communication Refactor

As part of a large effort, Antoine Pitrou has been refactoring Dask's
communication system. One sub-goal of this refactor was to allow us to explore
other transport mechanisms than Tornado IOStreams in a pluggable way.

One such alternative is ZeroMQ sockets. We've gotten both incredibly positive
and incredibly negative praise/warnings about using ZeroMQ. It's not a great
fit because Dask mostly just does point-to-point communication, so we don't
benefit from all of ZeroMQ's patterns, which now become more of a hindrance
than a benefit. However, we were interested in the performance impact of
managing all of our network communication in a completely separately managed
C++ thread unaffected by GIL issues.

Whether or you hate or love ZeroMQ you can now pick and choose. Antoine's
branch allows for easy swapping between transport mechanisms and opens the
possibility for more in the future like intra-process communication with
Queues, MPI, etc.. This doesn't affect most users, but some Dask deployments
are on supercomputers with exotic networks capable of very fast speeds. The
possibility that we might tap into Infiniband someday and have the ability to
manage data locally without copies (Tornado does not achieve this) is very
attractive to some user communities.

After very preliminary benchmarking we've found that ZeroMQ offers a small
speedup, but results in a lack of stability in the cluster under complex
workloads (likely our fault, not ZeroMQs, but still). ZeroMQ support is
strictly experimental and will likely stay that way for a long time. Readers
should not get excited about this.

### Perist and Dask-GLM

In collaboration with researchers at Capital One we've been working on a [set
of parallel solvers for first and second order
methods](http://github.com/dask/dask-glm), such as are commonly used in a broad
class of statistical and machine learning algorithms.

One challenge in this process has been building algorithms that are
simultaneously optimal for both the single machine and distributed schedulers.
The distributeed scheduler requires that we think about where data is, on the
client or on the cluster, where for the single machine scheudler this is less
of a problem. The distrbuted scheduler appropriately has a new verb,
`persist`, which keeps data as a Dask collection, but triggers all of the
internal computation

```python
compute(dask array) -> numpy array
persist(dask array) -> dask array
```

We have now mirrored this verb to the single machine scheduler in [dask/dask
#1927](https://github.com/dask/dask/pull/1927) and we get very nice performance
on dask-glm's algorithms in both cases now.

Working with the developers at Capital One has been very rewarding. I would
like to find more machine learning groups that fit the following criteria:

1.  Are focused on performance
2.  Need parallel computing
3.  Are committed to built good open source software
4.  Are sufficiently expert in their field to understand correct algorithms

If you know such a person or such a group, either in industry or just a grad
student in university, please encourage them to raise an issue at
http://github.com/dask/dask/issues/new . I will likely write a larger blogpost
on this topic in the near future.
