---
blogpost: true
date: Aug 2, 2019
title: Dask Release 2.2.0
tags: release
---

I'm pleased to announce the release of Dask version 2.2.
This is a significant release with bug fixes and new features.
The last blogged release was 2.0 on 2019-06-22.
This blogpost outlines notable changes since the last post.

You can conda install Dask:

    conda install dask

or pip install from PyPI:

    pip install dask[complete] --upgrade

Full changelogs are available here:

- [dask/dask](https://github.com/dask/dask/blob/master/docs/source/changelog.rst)
- [dask/distributed](https://github.com/dask/distributed/blob/master/docs/source/changelog.rst)

## Notable Changes

As always there are too many changes to list here,
instead we'll highlight a few that readers may find interesting,
or that break old behavior.
In particular we discuss the following:

1. Parquet rewrite
2. Nicer HTML output for Clients and Logs
3. Hyper-parameter selection with Hyperband in Dask-ML
4. Move bytes I/O handling out of Dask to FSSpec
5. async/await everywhere, and cleaner setup for developers
6. A new SSH deployment solution

## 1 - Parquet Rewrite

Today Dask DataFrame can read and write Parquet data using either
[fastparquet](https://fastparquet.readthedocs.io) or
[Apache Arrow](https://arrow.apache.org/).

```python
import dask.dataframe as dd

df = dd.read_parquet("/path/to/mydata.parquet", engine="arrow")
# or
df = dd.read_parquet("/path/to/mydata.parquet", engine="fastparquet")
```

Supporting both libraries within Dask has been helpful for
users, but introduced some maintenance burden, especially given that each
library co-evolved with Dask dataframe over the years. The contract between
Dask Dataframe and these libraries was convoluted, making it difficult to
evolve swiftly.

To address this we've formalized what Dask expects of Parquet reader/writers
into a more formal Parquet Engine contract. This keeps maintenance costs
low, enables independent development for each project, and allows for new
engines to emerge.

Already a GPU-accelerated Parquet reader is available in a PR on the [RAPIDS
cuDF](https://github.com/rapidsai/cudf/pull/2368) library.

As a result, we've also been able to fix a number of long-standing bugs, and
improve the functionality with both engines.

Some fun quotes from [Sarah Bird](https://github.com/birdsarah) during development

> I am currently testing this. So far so good. I can load my dataset in a few seconds with 1800 partitions. Game changing!

and

> I am now successfully working on a dataset with 74,000 partitions and no metadata.
> Opening dataset and df.head() takes 7 - 30s. (Presumably depending on whether s3fs cache is cold or not). THIS IS HUGE! This was literally impossible before.

The API remains the same, but functionality should be smoother.

Thanks to [Rick Zamora](https://github.com/rjzamora), [Martin
Durant](https://github.com/martindurant) for doing most of the work here and to
[Sarah Bird](https://github.com/birdsarah), [Wes
McKinney](https://github.com/wesm), and [Mike
McCarty](https://github.com/mmccarty) for providing guidance and review.

## 2 - Nicer HTML output for Clients and Logs

```python
from dask.distributed import Client
client = Client()
```

<table style="border: 2px solid white;">
<tr>
<td style="vertical-align: top; border: 0px solid white">
<h3 style="text-align: left;">Client</h3>
<ul style="text-align: left; list-style: none; margin: 0; padding: 0;">
  <li><b>Scheduler: </b>tcp://127.0.0.1:60275</li>
  <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a></li>
</ul>
</td>
<td style="vertical-align: top; border: 0px solid white">
<h3 style="text-align: left;">Cluster</h3>
<ul style="text-align: left; list-style:none; margin: 0; padding: 0;">
  <li><b>Workers: </b>4</li>
  <li><b>Cores: </b>12</li>
  <li><b>Memory: </b>17.18 GB</li>
</ul>
</td>
</tr>
</table>

```python
client.cluster.logs()
```

<div markdown="0">
<details>
<summary>Scheduler</summary>
<pre><code>distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:60275
distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8787
distributed.scheduler - INFO - Register tcp://127.0.0.1:60281
distributed.scheduler - INFO - Register tcp://127.0.0.1:60282
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60281
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60282
distributed.scheduler - INFO - Register tcp://127.0.0.1:60285
distributed.scheduler - INFO - Register tcp://127.0.0.1:60286
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60285
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:60286
distributed.scheduler - INFO - Receive client connection: Client-6b6ba1d0-b3bd-11e9-9bd0-acde48001122</code></pre>
</details>

<details>
<summary>tcp://127.0.0.1:60281</summary>
<pre><code>distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60281
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60281
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-c4_44fym
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------</code></pre>
</details>
<details>
<summary>tcp://127.0.0.1:60282</summary>
<pre><code>distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60282
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60282
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-quu4taje
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------</code></pre>
</details>
<details>
<summary>tcp://127.0.0.1:60285</summary>
<pre><code>distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60285
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60285
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-ll4cozug
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------</code></pre>
</details>
<details><summary>tcp://127.0.0.1:60286</summary><pre><code>distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:60286
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:60286
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          3
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /Users/mrocklin/workspace/dask/dask-worker-space/worker-lpbkkzj6
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:60275
distributed.worker - INFO - -------------------------------------------------</code></pre></details>
</div>

_Note: this looks better under any browser other than IE and Edge_

Thanks to [Jacob Tomlinson](https://github.com/jacobtomlinson) for this work.

## 3 - Hyperparameter selection with HyperBand

Dask-ML 1.0 has been released with a new `HyperBandSearchCV` meta-estimator for
hyper-parameter optimization. This can be used as an alternative to
`RandomizedSearchCV` to find similar hyper-parameters in less time by not
wasting time on hyper-parameters that are not promising.

```python
>>> import numpy as np
>>> from dask_ml.model_selection import HyperbandSearchCV
>>> from dask_ml.datasets import make_classification
>>> from sklearn.linear_model import SGDClassifier

>>> X, y = make_classification(chunks=20)
>>> est = SGDClassifier(tol=1e-3)
>>> param_dist = {'alpha': np.logspace(-4, 0, num=1000),
>>>               'loss': ['hinge', 'log', 'modified_huber', 'squared_hinge'],
>>>               'average': [True, False]}

>>> search = HyperbandSearchCV(est, param_dist)
>>> search.fit(X, y, classes=np.unique(y))
>>> search.best_params_
{'loss': 'log', 'average': False, 'alpha': 0.0080502}
```

Thanks to [Scott Sievert](http://github.com/stsievert).
You can see Scott talk about this topic in greater depth by watching his
[SciPy 2019 talk](https://youtu.be/x67K9FiPFBQ).

## 4 - Move bytes I/O handling out of Dask to FSSpec

We've spun out Dask's internal code to read and write raw data to different
storage systems out to a separate project, [fsspec](https://fsspec.readthedocs.io).

Here is a small example:

```python
import fsspec

with fsspec.open("https://github.com/dask/dask/edit/master/README.rst") as f:
    print(f.read(1000))

with fsspec.open("s3://bucket/myfile.csv") as f:
    df = pd.read_csv(f)

with fsspec.open("hdfs:///path/to/myfile.csv") as f:
    df = pd.read_csv(f)

with fsspec.open("gcs://bucket/myfile.csv") as f:
    df = pd.read_csv(f)
```

Dask's I/O infrastructure to read and write bytes from systems like
HDFS, S3, GCS, Azure, and other remote storage systems is arguably the most
uniform and comprehensive in Python today. Through tooling like
[s3fs](https://s3fs.readthedocs.io), [gcsfs](https://gcsfs.readthedocs.io),
and ~~hdfs3~~ [pyarrow.hdfs](https://arrow.apache.org/docs/python/filesystems.html),
it's easy to read and write data in a Pythonic way to a
variety of remote storage systems.

Early on we decided that we wanted this code to live outside of the mainline
Dask codebase, which is why they are independent projects.
This choice allowed other libraries, like Pandas, Zarr, and others to benefit
from this work, without a strict dependency on Dask.
However, there was still code within Dask that helped to unify them a bit.
We've moved this code out to an external project,
[fsspec](https://filesystem-spec.readthedocs.io/en/latest) which includes all
of the centralization code that Dask used to provide, as well as a formal
specification for what a remote data system should look like in order to be
compatible. This also helps to unify efforts with other projects like Arrow.

Special thanks to [Martin Durant](https://github.com/martindurant) for
shepherding Dask's I/O infrastructure over the years, and for doing the more
immediate work of splitting out `fsspec`.

You can read more about FSSpec and its transition out of Dask
[here](https://blog.dask.org/2019/07/23/extracting-fsspec-from-dask).

## 5 - Async/Await everywhere, and cleaner setup for developers

In Dask 2.0 we dropped Python 2 support and now support only Python 3.5 and
above.
This allows us to adopt async and await syntax for concurrent execution rather
than an older coroutine based approach with `yield`. The differences here
started out as largely aesthetic, but triggered a number of substantive
improvements as we walked through the codebase cleaning things up. Starting
and stopping internal Scheduler, Worker, Nanny, and Client objects is now far
more uniform, reducing the presence of subtle bugs.

This is discussed in more detail in the [Python API setup
documentation](https://docs.dask.org/en/latest/setup/python-advanced.html) and
is encapsulated in this code example from those docs:

```python
import asyncio

from dask.distributed import Scheduler, Worker, Client

async def f():
    async with Scheduler() as s:
        async with Worker(s.address) as w1, Worker(s.address) as w2:
            async with Client(s.address, asynchronous=True) as client:
                future = client.submit(lambda x: x + 1, 10)
                result = await future
                print(result)

asyncio.get_event_loop().run_until_complete(f())
```

As a result of this and other internal cleanup intermittent testing failures in
our CI have disappeared, and developer mood is high :)

## 6 - A new SSHCluster

We've added a second SSH cluster deployment solution. It looks like this:

```python
from distributed.deploy.ssh2 import SSHCluster  # this will move in future releases

cluster = SSHCluster(
    hosts=["host1", "host2", "host3", "host4"],
    # hosts=["localhost"] * 4  # if you want to try this out locally,
    worker_kwargs={"nthreads": 4},
    scheduler_kwargs={},
    connect_kwargs={"known_hosts": None}
)
```

_Note that this object is experimental, and subject to change without notice_

We worked on this for two reasons:

1. Our user survey showed that a surprising number of people were deploying
   Dask with SSH. Anecdotally they seem to be just SSHing into machines and
   then using Dask's normal [Dask Command Line
   Interface](https://docs.dask.org/en/latest/setup/cli.html))

   We wanted a solution that was easier than this.

2. We've been trying to unify the code in the various deployment solutions
   (like Kubernetes, SLURM, Yarn/Hadoop) to a central codebase, and having a
   simple SSHCluster as a test case has proven valuable for testing and
   experimentation.

_Also note, Dask already has a
[dask-ssh](https://docs.dask.org/en/latest/setup/ssh.html) solution today that is more mature_

We expect that unification of deployment will be a central theme for the next
few months of development.

## Acknowledgements

There have been two releases since the last time we had a release blogpost.
The following people contributed to the following repositories since the 2.0
release on June 30th:

- [dask/dask](https://github.com/dask/dask)
  - Brett Naul
  - Daniel Saxton
  - David Brochart
  - Davis Bennett
  - Elliott Sales de Andrade
  - GALI PREM SAGAR
  - James Bourbeau
  - Jim Crist
  - Loïc Estève
  - Martin Durant
  - Matthew Rocklin
  - Matthias Bussonnier
  - Natalya Rapstine
  - Nick Becker
  - Peter Andreas Entschev
  - Ralf Gommers
  - Richard (Rick) Zamora
  - Sarah Bird
  - Sean McKenna
  - Tom Augspurger
  - Willi Rath
  - Xavier Holt
  - andrethrill
  - asmith26
  - msbrown47
  - tshatrov
- [dask/distributed](https://github.com/dask/distributed)
  - Christian Hudon
  - Gabriel Sailer
  - Jacob Tomlinson
  - James Bourbeau
  - Jim Crist
  - Martin Durant
  - Matthew Rocklin
  - Pierre Glaser
  - Russ Bubley
  - tjb900
- [dask/dask-jobqueue](https://github.com/dask/dask-jobqueue)
  - Guillaume Eynard-Bontemps
  - Leo Singer
  - Loïc Estève
  - Matthew Rocklin
  - Stuart Berg
- [dask/dask-examples](https://github.com/dask/dask-examples)
  - Chris White
  - Ian Rose
  - Matthew Rocklin
- [dask/dask-mpi](https://github.com/dask/dask-mpi)
  - Anderson Banihirwe
  - Kevin Paul
  - Matthew Rocklin
- [dask/dask-kubernetes](https://github.com/dask/dask-kubernetes)
  - Matthew Rocklin
  - Tom Augspurger
- [dask/dask-ml](https://github.com/dask/dask-ml)
  - Roman Yurchak
  - Tom Augspurger
- [dask/dask-yarn](https://github.com/dask/dask-yarn)
  - Al Johri
  - Jim Crist
- [dask/dask-examples](https://github.com/dask/dask-examples)
