---
blogpost: true
date: Apr 28, 2017
title: Dask Development Log
tags: Programming, Python, scipy
---

_This work is supported by [Continuum Analytics](http://continuum.io) and the
Data Driven Discovery Initiative from the [Moore
Foundation](https://www.moore.org/)_

To increase transparency I'm blogging weekly(ish) about the work done on Dask
and related projects during the previous week. This log covers work done
between 2017-04-20 and 2017-04-28. Nothing here is ready for production. This
blogpost is written in haste, so refined polish should not be expected.

Development in Dask and Dask-related projects during the last week includes the
following notable changes:

1.  Improved **Joblib** support, accelerating existing Scikit-Learn code
2.  A **dask-glm** powered **LogisticRegression** estimator that is scikit-learn
    compatible
3.  Additional **Parquet** support by **Arrow**
4.  **Sparse arrays**
5.  Better spill-to-disk behavior
6.  **AsyncIO** compatible Client
7.  **TLS (SSL)** support
8.  NumPy `__array_ufunc__` protocol

### Joblib

Scikit learn parallelizes most of their algorithms with
[Joblib](https://pythonhosted.org/joblib/), which provides a simple interface
for embarrassingly parallel computations. Dask has been able to [hijack
joblib](http://distributed.readthedocs.io/en/latest/joblib.html) code and
serve as the backend for some time now, but it had some limitations,
particularly because we would repeatedly send data back and forth from a
worker to client for every batch of computations.

```python
import distributed.joblib
from joblib import Parallel, parallel_backend

with parallel_backend('dask.distributed', scheduler_host='HOST:PORT'):
    # normal Joblib code
```

Now there is a `scatter=` keyword, which allows you to pre-scatter select
variables out to all of the Dask workers. This significantly cuts down on
overhead, especially on machine learning workloads where most of the data
doesn't change very much.

```python
# Send the training data only once to each worker
with parallel_backend('dask.distributed', scheduler_host='localhost:8786',
                      scatter=[digits.data, digits.target]):
    search.fit(digits.data, digits.target)
```

Early trials indicate that computations like scikit-learn's RandomForest scale
nicely on a cluster without any additional code.

This is particularly nice because it allows Dask and Scikit-Learn to play well
together without having to introduce Dask within the Scikit-Learn codebase at
all. From a maintenance perspective this combination is very attractive.

Work done by [Jim Crist](http://jcrist.github.io/) in [dask/distributed #1022](https://github.com/dask/distributed/pull/1022)

### Dask-GLM Logistic Regression

The convex optimization solvers in the
[dask-glm](https://github.com/dask/dask-glm) project allow us to solve common
machine learning and statistics problems in parallel and at scale.
Historically this young library has contained only optimization solvers and
relatively little in the way of user API.

This week dask-glm grew new LogisticRegression and LinearRegression estimators
that expose the scalable convex optimization algorithms within dask-glm through
a Scikit-Learn compatible interface. This can both speedup solutions on a
single computer or provide solutions for datasets that were previously too
large to fit in memory.

```python
from dask_glm.estimators import LogisticRegression

est = LogisticRegression()
est.fit(my_dask_array, labels)
```

[This notebook](http://nbviewer.jupyter.org/gist/anonymous/15742155693794ddd31ea85b654cbc7e)
compares performance to the latest release of scikit-learn on a 5,000,000
dataset running on a single machine. Dask-glm beats scikit-learn by a factor
of four, which is also roughly the number of cores on the development machine.
However in response [this
notebook](http://nbviewer.jupyter.org/gist/ogrisel/5f2d31bc5e7df852b4ca63f5f6049f42)
by [Olivier Grisel](http://ogrisel.com/) shows the development version of
scikit-learn (with a new algorithm) beating out dask-glm by a factor of six.
This just goes to show you that being smarter about your algorithms is almost
always a better use of time than adopting parallelism.

Work done by [Tom Augspurger](https://tomaugspurger.github.io/) and [Chris
White](https://github.com/moody-marlin/) in
[dask/dask-glm #40](https://github.com/dask/dask-glm/pull/40)

### Parquet with Arrow

The Parquet format is quickly becoming a standard for parallel and distributed
dataframes. There are currently two Parquet reader/writers accessible from
Python, [fastparquet](http://fastparquet.readthedocs.io/en/latest/) a
NumPy/Numba solution, and [Parquet-CPP](https://github.com/apache/parquet-cpp) a
C++ solution with wrappers provided by [Arrow](https://arrow.apache.org/).
Dask.dataframe has supported parquet for a while now with fastparquet.

However, users will now have an option to use Arrow instead by switching the
`engine=` keyword in the `dd.read_parquet` function.

```python
df = dd.read_parquet('/path/to/mydata.parquet', engine='fastparquet')
df = dd.read_parquet('/path/to/mydata.parquet', engine='arrow')
```

Hopefully this capability increases the use of both projects and results in
greater feedback to those libraries so that they can continue to advance
Python's access to the Parquet format. As a gentle reminder, you can typically
get _much_ faster query times by switching from CSV to Parquet. This is often
much more effective than parallel computing.

Work by [Wes McKinney](http://wesmckinney.com/) in [dask/dask
#2223](https://github.com/dask/dask/pull/2223).

### Sparse Arrays

There is a small multi-dimensional sparse array library here:
[https://github.com/mrocklin/sparse](https://github.com/mrocklin/sparse). It
allows us to represent arrays compactly in memory when most entries are zero.
This differs from the standard solution in
[scipy.sparse](https://docs.scipy.org/doc/scipy-0.19.0/reference/sparse.html),
which can only support arrays of dimension two (matrices) and not greater.

    pip install sparse

```python
>>> import numpy as np
>>> x = np.random.random(size=(10, 10, 10, 10))
>>> x[x < 0.9] = 0
>>> x.nbytes
80000

>>> import sparse
>>> s = sparse.COO(x)
>>> s
<COO: shape=(10, 10, 10, 10), dtype=float64, nnz=1074>

>>> s.nbytes
12888

>>> sparse.tensordot(s, s, axes=((1, 0, 3), (2, 1, 0))).sum(axis=1)
array([ 100.93868073,  128.72312323,  119.12997217,  118.56304153,
        133.24522101,   98.33555365,   90.25304866,   98.99823973,
        100.57555847,   78.27915528])
```

Additionally, this `sparse` library more faithfully follows the `numpy.ndarray`
API, which is exactly what `dask.array` expects. Because of this close API
matching dask.array is able to parallelize around sparse arrays just as easily
as it parallelizes around dense numpy arrays. This gives us a decent
distributed multidimensional sparse array library relatively cheaply.

```python
>>> import dask.array as da
>>> x = da.random.random(size=(10000, 10000, 10000, 10000),
...                      chunks=(100, 100, 100, 100))
>>> x[x < 0.9] = 0

>>> s = x.map_blocks(sparse.COO)  # parallel array of sparse arrays
```

Work on the `sparse` library is so far by [myself](http://matthewrocklin.com/)
and [Jake VanderPlas](https://staff.washington.edu/jakevdp/) and is available
[here](https://github.com/mrocklin/sparse). Work connecting this up to
Dask.array is in [dask/dask #2234](http://matthewrocklin.com/).

### Better spill to disk behavior

I've been playing with a 50GB sample of the 1TB [Criteo
dataset](http://labs.criteo.com/2013/12/download-terabyte-click-logs-2/) on my
laptop (this is where I'm using sparse arrays). To make computations flow a
bit faster I've improved the performance of Dask's spill-to-disk policies.

Now, rather than depend on (cloud)pickle we use Dask's network protocol, which
handles data more efficiently, compresses well, and has special handling for
common and important types like NumPy arrays and things built out of NumPy
arrays (like sparse arrays).

As a result reading and writing excess data to disk is significantly faster.
When performing machine learning computations (which are fairly heavy-weight)
disk access is now fast enough that I don't notice it in practice and running
out of memory doesn't significantly impact performance.

This is only really relevant when using common types (like numpy arrays) and
when your computation to disk access ratio is relatively high (such as is the
case for analytic workloads), but it was a simple fix and yielded a nice boost
to my personal productivity.

Work by myself in [dask/distributed #946](https://github.com/dask/distributed/pull/946).

### AsyncIO compatible Client

The Dask.distributed scheduler maintains a fully asynchronous API for use with
non-blocking systems like Tornado or AsyncIO. Because Dask supports Python 2
all of our internal code is written with Tornado. While Tornado and AsyncIO
can work together, this generally requires a bit of excess book-keeping, like
turning Tornado futures into AsyncIO futures, etc..

Now there is an AsyncIO specific Client that only includes non-blocking methods
that are AsyncIO native. This allows for more idiomatic asynchronous code in
Python 3.

```python
async with AioClient('scheduler-address:8786') as c:
    future = c.submit(func, *args, **kwargs)
    result = await future
```

Work by [Krisztián Szűcs](https://github.com/kszucs) in [dask/distributed
#1029](https://github.com/dask/distributed/pull/1029).

### TLS (SSL) support

TLS (previously called SSL) is a common and trusted solution to authentication
and encryption. It is a commonly requested feature by companies of
institutions where intra-network security is important. This is currently
being worked on now at [dask/distributed
#1034](https://github.com/dask/distributed/pull/1034). I encourage anyone who
this may affect to engage on that pull request.

Work by [Antoine Pitrou](https://github.com/pitrou) in [dask/distributed
#1034](https://github.com/dask/distributed/pull/1034) and previously by [Marius
van Niekerk](https://github.com/mariusvniekerk) in [dask/distributed
#866](https://github.com/dask/distributed/pull/866).

### NumPy `__array_ufunc__`

This recent change in NumPy (literally merged as I was typing this blogpost)
allows other array libraries to take control of the the existing NumPy ufuncs,
so if you call something like `np.exp(my_dask_array)` this will no longer
convert to a NumPy array, but will rather call the appropriate
`dask.array.exp` function. This is a big step towards writing generic array
code that works both on NumPy arrays as well as other array projects like
dask.array, xarray, bcolz, sparse, etc..

As with all large changes in NumPy this was accomplished through a
collaboration of many people. PR in [numpy/numpy #8247](https://github.com/numpy/numpy/pull/8247).
