---
blogpost: true
date: Mar 23, 2017
title: Dask Release 0.14.1
tags: Programming, Python, scipy, dask
---

_This work is supported by [Continuum Analytics](http://continuum.io),
the [XDATA Program](http://www.darpa.mil/program/XDATA),
and the Data Driven Discovery Initiative from the [Moore
Foundation](https://www.moore.org/)._

I'm pleased to announce the release of Dask version 0.14.1. This release
contains a variety of performance and feature improvements. This blogpost
includes some notable features and changes since the last release on February
27th.

As always you can conda install from conda-forge

    conda install -c conda-forge dask distributed

or you can pip install from PyPI

    pip install dask[complete] --upgrade

## Arrays

Recent work in distributed computing and machine learning have motivated new
performance-oriented and usability changes to how we handle arrays.

### Automatic chunking and operation on NumPy arrays

Many interactions between Dask arrays and NumPy arrays work smoothly. NumPy
arrays are made lazy and are appropriately chunked to match the operation
and the Dask array.

```python
>>> x = np.ones(10)                 # a numpy array
>>> y = da.arange(10, chunks=(5,))  # a dask array
>>> z = x + y                       # combined become a dask.array
>>> z
dask.array<add, shape=(10,), dtype=float64, chunksize=(5,)>

>>> z.compute()
array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.])
```

### Reshape

Reshaping distributed arrays is simple in simple cases, and can be quite
complex in complex cases. Reshape now supports a much more broad set of shape
transformations where any dimension is collapsed or merged to other dimensions.

```python
>>> x = da.ones((2, 3, 4, 5, 6), chunks=(2, 2, 2, 2, 2))
>>> x.reshape((6, 2, 2, 30, 1))
dask.array<reshape, shape=(6, 2, 2, 30, 1), dtype=float64, chunksize=(3, 1, 2, 6, 1)>
```

This operation ends up being quite useful in a number of distributed array
cases.

### Optimize Slicing to Minimize Communication

Dask.array slicing optimizations are now careful to produce graphs that avoid
situations that could cause excess inter-worker communication. The details of
how they do this is a bit out of scope for a short blogpost, but the history
here is interesting.

Historically dask.arrays were used almost exclusively by researchers with large
on-disk arrays stored as HDF5 or NetCDF files. These users primarily used the
single machine multi-threaded scheduler. We heavily tailored Dask array
optimizations to this situation and made that community pretty happy.
Now as some of that community switches to cluster computing on larger datasets
the optimization goals shift a bit. We have tons of distributed disk bandwidth
but really want to avoid communicating large results between workers.
Supporting both use cases is possible and I think that we've achieved that in
this release so far, but it's starting to require increasing levels of care.

### Micro-optimizations

With distributed computing also comes larger graphs and a growing importance of
graph-creation overhead. This has been optimized somewhat in this release. We
expect this to be a focus going forward.

## DataFrames

### Set_index

Set_index is smarter in two ways:

1.  If you set_index on a column that happens to be sorted then we'll identify
    that and avoid a costly shuffle. This was always possible with the `sorted=`
    keyword but users rarely used this feature. Now this is automatic.
2.  Similarly when setting the index we can look at the size of the data and
    determine if there are too many or too few partitions and rechunk the data
    while shuffling. This can significantly improve performance if there are too
    many partitions (a common case).

- [dask/dask #2025](https://github.com/dask/dask/pull/2025)
- [dask/dask #2091](https://github.com/dask/dask/pull/2091)

### Shuffle performance

We've micro-optimized some parts of dataframe shuffles. Big thanks to the
Pandas developers for the help here. This accelerates set_index, joins,
groupby-applies, and so on.

- [dask/dask #2032](https://github.com/dask/dask/pull/2032)

### Fastparquet

The [fastparquet](http://fastparquet.readthedocs.io/en/latest/) library has
seen a lot of use lately and has undergone a number of community bugfixes.

Importantly, Fastparquet now supports Python 2.

We strongly recommend Parquet as the standard data storage format for Dask
dataframes (and Pandas DataFrames).

[dask/fastparquet #87](https://github.com/dask/fastparquet/pull/87)

## Distributed Scheduler

### Replay remote exceptions

Debugging is hard in part because exceptions happen on remote machines where
normal debugging tools like `pdb` can't reach. Previously we were able to
bring back the traceback and exception, but you couldn't dive into the stack
trace to investigate what went wrong:

```python
def div(x, y):
    return x / y

>>> future = client.submit(div, 1, 0)
>>> future
<Future: status: error, key: div-4a34907f5384bcf9161498a635311aeb>

>>> future.result()  # getting result re-raises exception locally
<ipython-input-3-398a43a7781e> in div()
      1 def div(x, y):
----> 2     return x / y

ZeroDivisionError: division by zero
```

Now Dask can bring a failing task and all necessary data back to the local
machine and rerun it so that users can leverage the normal Python debugging
toolchain.

```python
>>> client.recreate_error_locally(future)
<ipython-input-3-398a43a7781e> in div(x, y)
      1 def div(x, y):
----> 2     return x / y
ZeroDivisionError: division by zero
```

Now if you're in IPython or a Jupyter notebook you can use the `%debug` magic
to jump into the stacktrace, investigate local variables, and so on.

```python
In [8]: %debug
> <ipython-input-3-398a43a7781e>(2)div()
      1 def div(x, y):
----> 2     return x / y

ipdb> pp x
1
ipdb> pp y
0
```

[dask/distributed #894](https://github.com/dask/distributed/pull/894)

### Async/await syntax

Dask.distributed uses Tornado for network communication and Tornado coroutines
for concurrency. Normal users rarely interact with Tornado coroutines; they
aren't familiar to most people so we opted instead to copy the
concurrent.futures API. However some complex situations are _much_ easier to
solve if you know a little bit of async programming.

Fortunately, the Python ecosystem seems to be embracing this change towards
native async code with the async/await syntax in Python 3. In an effort to
motivate people to learn async programming and to gently nudge them towards
Python 3 Dask.distributed we now support async/await in a few cases.

You can wait on a dask Future

```python
async def f():
    future = client.submit(func, *args, **kwargs)
    result = await future
```

You can put the `as_completed` iterator into an async for loop

```python
async for future in as_completed(futures):
    result = await future
    ... do stuff with result ...
```

And, because Tornado supports the await protocols you can also use the existing
shadow concurrency API (everything prepended with an underscore) with await.
(This was doable before.)

```python
results = client.gather(futures)         # synchronous
...
results = await client._gather(futures)  # asynchronous
```

If you're in Python 2 you can always do this with normal `yield` and
the `tornado.gen.coroutine` decorator.

[dask/distributed #952](https://github.com/dask/distributed/pull/952)

### Inproc transport

In the last release we enabled Dask to communicate over more things than just
TCP. In practice this doesn't come up (TCP is pretty useful). However in this
release we now support single-machine "clusters" where the clients, scheduler,
and workers are all in the same process and transfer data cost-free over
in-memory queues.

This allows the in-memory user community to use some of the more advanced
features (asynchronous computation, spill-to-disk support, web-diagnostics)
that are only available in the distributed scheduler.

This is on by default if you create a cluster with LocalCluster without using
Nanny processes.

```python
>>> from dask.distributed import LocalCluster, Client

>>> cluster = LocalCluster(nanny=False)

>>> client = Client(cluster)

>>> client
<Client: scheduler='inproc://192.168.1.115/8437/1' processes=1 cores=4>

>>> from threading import Lock         # Not serializable
>>> lock = Lock()                      # Won't survive going over a socket
>>> [future] = client.scatter([lock])  # Yet we can send to a worker
>>> future.result()                    # ... and back
<unlocked _thread.lock object at 0x7fb7f12d08a0>
```

[dask/distributed #919](https://github.com/dask/distributed/pull/919)

### Connection pooling for inter-worker communications

Workers now maintain a pool of sustained connections between each other. This
pool is of a fixed size and removes connections with a least-recently-used
policy. It avoids re-connection delays when transferring data between workers.
In practice this shaves off a millisecond or two from every communication.

This is actually a revival of an old feature that we had turned off last year
when it became clear that the performance here wasn't a problem.

Along with other enhancements, this takes our round-trip latency down to 11ms
on my laptop.

```python
In [10]: %%time
    ...: for i in range(1000):
    ...:     future = client.submit(inc, i)
    ...:     result = future.result()
    ...:
CPU times: user 4.96 s, sys: 348 ms, total: 5.31 s
Wall time: 11.1 s
```

There may be room for improvement here though. For comparison here is the same
test with the `concurent.futures.ProcessPoolExecutor`.

```python
In [14]: e = ProcessPoolExecutor(8)

In [15]: %%time
    ...: for i in range(1000):
    ...:     future = e.submit(inc, i)
    ...:     result = future.result()
    ...:
CPU times: user 320 ms, sys: 56 ms, total: 376 ms
Wall time: 442 ms
```

Also, just to be clear, this measures total roundtrip latency, not overhead.
Dask's distributed scheduler overhead remains in the low hundreds of
microseconds.

[dask/distributed #935](https://github.com/dask/distributed/pull/935)

## Related Projects

There has been activity around Dask and machine learning:

- [dask-learn](https://github.com/dask/dask-learn) is undergoing some
  performance enhancements. It turns out that when you offer distributed grid
  search people quickly want to scale up their computations to hundreds of
  thousands of trials.
- [dask-glm](https://github.com/dask/dask-glm) now has a few decent algorithms
  for convex optimization. The authors of this wrote a blogpost very recently
  if you're interested:
  [Developing Convex Optimization Algorithms in Dask](/2017/03/22/dask-glm-1)
- [dask-xgboost](https://github.com/dask/dask-xgboost) lets you hand off
  distributed data in Dask dataframes or arrays and hand it directly to a
  distributed XGBoost system (that Dask will nicely set up and tear down for
  you). This was a nice example of easy hand-off between two distributed
  services running in the same processes.

## Acknowledgements

The following people contributed to the dask/dask repository since the 0.14.0 release
on February 27th

- Antoine Pitrou
- Brian Martin
- Elliott Sales de Andrade
- Erik Welch
- Francisco de la Peña
- jakirkham
- Jim Crist
- Jitesh Kumar Jha
- Julien Lhermitte
- Martin Durant
- Matthew Rocklin
- Markus Gonser
- Talmaj

The following people contributed to the dask/distributed repository since the
1.16.0 release on February 27th

- Antoine Pitrou
- Ben Schreck
- Elliott Sales de Andrade
- Martin Durant
- Matthew Rocklin
- Phil Elson
