---
blogpost: true
date: Feb 27, 2017
title: Dask Release 0.14.0
tags: Programming, Python, scipy
---

_This work is supported by [Continuum Analytics](http://continuum.io)
the [XDATA Program](http://www.darpa.mil/program/XDATA)
and the Data Driven Discovery Initiative from the [Moore
Foundation](https://www.moore.org/)_

## Summary

Dask just released version 0.14.0. This release contains some significant
internal changes as well as the usual set of increased API coverage and bug
fixes. This blogpost outlines some of the major changes since the last release
January, 27th 2017.

1.  Structural sharing of graphs between collections
2.  Refactor communications system
3.  Many small dataframe improvements
4.  Top-level persist function

You can install new versions using Conda or Pip

    conda install -c conda-forge dask distributed

or

    pip install dask[complete] distributed --upgrade

## Share Graphs between Collections

Dask collections (arrays, bags, dataframes, delayed) hold onto task graphs that
have all of the tasks necessary to create the desired result. For larger
datasets or complex calculations these graphs may have thousands, or sometimes
even millions of tasks. In some cases the overhead of handling these graphs
can become significant.

This is especially true because dask collections don't modify their graphs in
place, they make new graphs with updated computations. Copying graph data
structures with millions of nodes can take seconds and interrupt interactive
workflows.

To address this dask.arrays and dask.delayed collections now use special graph
data structures with structural sharing. This significantly cuts down on the
amount of overhead when building repetitive computations.

```python
import dask.array as da

x = da.ones(1000000, chunks=(1000,))  # 1000 chunks of size 1000
```

### Version 0.13.0

```python
%time for i in range(100): x = x + 1
CPU times: user 2.69 s, sys: 96 ms, total: 2.78 s
Wall time: 2.78 s
```

### Version 0.14.0

```python
%time for i in range(100): x = x + 1
CPU times: user 756 ms, sys: 8 ms, total: 764 ms
Wall time: 763 ms
```

The difference in this toy problem is moderate but for real world cases this
can difference can grow fairly large. This was also one of the blockers
identified by the climate science community stopping them from handling
petabyte scale analyses.

We chose to roll this out for arrays and delayed first just because those are
the two collections that typically produce large task graphs. Dataframes and
bags remain as before for the time being.

## Communications System

Dask communicates over TCP sockets. It uses Tornado's IOStreams to handle
non-blocking communication, framing, etc.. We've run into some performance
issues with Tornado when moving large amounts of data. Some of this [has been
improved upstream](https://github.com/tornadoweb/tornado/pull/1873) in Tornado
directly, but we still want the ability to optionally drop Tornado's
byte-handling communication stack in the future. This is especially important
as dask gets used in institutions with faster and more exotic interconnects
(supercomputers). We've been asked a few times to support other transport
mechanisms like MPI.

The first step (and probably hardest step) was to make Dask's communication
system is pluggable so that we can use different communication options without
significant source-code changes. We managed this a month ago and now it is
possible to add other transports to Dask relatively easily. TCP remains the
only real choice today though there is also an experimental ZeroMQ option
(which provides little-to-no performance benefit over TCP) as well as a fully
in-memory option in development.

For users the main difference you'll see is that `tcp://` is now prepended many
places. For example:

```
$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:  tcp://192.168.1.115:8786
...
```

## Variety of Dataframe Changes

As usual the Pandas API has been more fully covered by community contributors.
Some representative changes include the following:

1.  Support non-uniform categoricals: We no longer need to do a full pass
    through the data when categorizing a column. Instead we categorize each
    partition independently (even if they have different category values) and
    then unify these categories only when necessary

    ```python
    df['x'] = df['x'].astype('category')  # this is now fast
    ```

2.  Groupby cumulative reductions

    ```python
    df.groupby('x').cumsum()
    ```

3.  Support appending to Parquet collections

    ```python
    df.to_parquet('/path/to/foo.parquet', append=True)
    ```

4.  A new string and HTML representation of dask.dataframes. Typically Pandas
    prints dataframes on the screen by rendering the first few rows of data.
    However, Because Dask.dataframes are lazy we don't have this data and so
    typically render some metadata about the dataframe

    ```python
    >>> df  # version 0.13.0
    dd.DataFrame<make-ti..., npartitions=366, divisions=(Timestamp('2000-01-01
    00:00:00', freq='D'), Timestamp('2000-01-02 00:00:00', freq='D'),
    Timestamp('2000-01-03 00:00:00', freq='D'), ..., Timestamp('2000-12-31
    00:00:00', freq='D'), Timestamp('2001-01-01 00:00:00', freq='D'))>
    ```

    This rendering, while informative, can be improved. Now we render
    dataframes as a Pandas dataframe, but place metadata in the dataframe
    instead of the actual data.

    ```python
    >>> df  # version 0.14.0
    Dask DataFrame Structure:
                           x        y      z
    npartitions=366
    2000-01-01       float64  float64  int64
    2000-01-02           ...      ...    ...
    ...                  ...      ...    ...
    2000-12-31           ...      ...    ...
    2001-01-01           ...      ...    ...
    Dask Name: make-timeseries, 366 tasks
    ```

    Additionally this renders nicely as an HTML table in a Jupyter notebook

## Variety of Distributed System Changes

There have also been a wide variety of changes to the distributed system. I'll
include a representative sample here to give a flavor of what has been
happening:

1.  Ensure first-come-first-served priorities when dealing with multiple
    clients
2.  Send small amounts of data through Channels. Channels are a way for
    multiple clients/users connected to the same scheduler to publish and
    exchange data between themselves. Previously they only transmitted Futures
    (which could in trun point to larger data living on the cluster). However
    we found that it was useful to communicate small bits of metadata as well,
    for example to signal progress or stopping critera between clients
    collaborating on the same workloads. Now you can publish any msgpack
    serializable data on Channels.

    ```python
    # Publishing Client
    scores = client.channel('scores')
    scores.append(123.456)

    # Subscribing Client
    scores = client.channel('scores')
    while scores.data[-1] < THRESHOLD:
        ... continue working ...
    ```

3.  We're better at estimating the size in data of SciPy Sparse matrices and
    Keras models. This allows Dask to make smarter choices about when it
    should and should not move data around for load balancing. Additionally
    Dask can now also serialize Keras models.
4.  To help people deploying on clusters that have a shared network file system
    (as is often the case in scientific or academic institutions) the scheduler
    and workers can now communicate connection information using the
    `--scheduler-file` keyword

    ```
    dask-scheduler --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json

    >>> client = Client(scheduler_file='/path/to/scheudler.json')
    ```

    Previously we needed to communicate the address of the scheduler, which
    could be challenging when we didn't know on which node the scheduler would
    be run.

## Other

There are a number of smaller details not mentioned in this blogpost. For more
information visit the changelogs and documentation

- [Dask/dask docs](http://dask.pydata.org/en/latest/)
- [Dask/dask changelog](http://dask.pydata.org/en/latest/changelog.html)
- [Dask/distributed docs](http://distributed.readthedocs.org/en/latest/)
- [Dask/distributed changelog](http://distributed.readthedocs.org/en/latest/changelog.html)

Additionally a great deal of Dask work over the last month has happened
_outside_ of these core dask repositories.

You can install or upgrade using Conda or Pip

    conda install -c conda-forge dask distributed

or

    pip install dask[complete] distributed --upgrade

## Acknowledgements

Since the last 0.13.0 release on January 27th the following developers have
contributed to the dask/dask repository:

- Antoine Pitrou
- Chris Barber
- Daniel Davis
- Elmar Ritsch
- Erik Welch
- jakirkham
- Jim Crist
- John Crickett
- jspreston
- Juan Luis Cano Rodríguez
- kayibal
- Kevin Ernst
- Markus Gonser
- Matthew Rocklin
- Martin Durant
- Nir
- Sinhrks
- Talmaj Marinc
- Vlad Frolov
- Will Warner

And the following developers have contributed to the dask/distributed
repository:

- Antoine Pitrou
- Ben Schreck
- bmaisonn
- Brett Naul
- Demian Wassermann
- Israel Saeta Pérez
- John Crickett
- Joseph Crail
- Malte Gerken
- Martin Durant
- Matthew Rocklin
- Min RK
- strets123
