---
blogpost: true
date: Aug 2, 2018
title: Dask Development Log
tags: Programming, Python, scipy, dask
---

_This work is supported by [Anaconda Inc](http://anaconda.com)_

To increase transparency I'm trying to blog more often about the current work
going on around Dask and related projects. Nothing here is ready for
production. This blogpost is written in haste, so refined polish should not be
expected.

Over the last two weeks we've seen activity in the following areas:

1.  An experimental Actor solution for stateful processing
1.  Machine learning experiments with hyper-parameter selection and parameter
    servers.
1.  Development of more preprocessing transformers
1.  Statistical profiling of the distributed scheduler's internal event loop
    thread and internal optimizations
1.  A new release of dask-yarn
1.  A new narrative on dask-stories about modelling mobile networks
1.  Support for LSF clusters in dask-jobqueue
1.  Test suite cleanup for intermittent failures

### Stateful processing with Actors

Some advanced workloads want to directly manage and mutate state on workers. A
task-based framework like Dask can be forced into this kind of workload using
long-running-tasks, but it's an uncomfortable experience. To address this
we've been adding an experimental Actors framework to Dask alongside the
standard task-scheduling system. This provides reduced latencies, removes
scheduling overhead, and provides the ability to directly mutate state on a
worker, but loses niceties like resilience and diagnostics.

The idea to adopt Actors was shamelessly stolen from the [Ray Project](http://ray.readthedocs.io/en/latest/) :)

Work for Actors is happening in [dask/distributed #2133](https://github.com/dask/distributed/pull/2133).

```python
class Counter:
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1
        return self.n

counter = client.submit(Counter, actor=True).result()

>>> future = counter.increment()
>>> future.result()
1
```

## Machine learning experiments

### Hyper parameter optimization on incrementally trained models

Many Scikit-Learn-style estimators feature a `partial_fit` method that enables
incremental training on batches of data. This is particularly well suited for
systems like Dask array or Dask dataframe, that are built from many batches of
Numpy arrays or Pandas dataframes. It's a nice fit because all of the
computational algorithm work is already done in Scikit-Learn, Dask just has to
administratively move models around to data and call scikit-learn (or other
machine learning models that follow the fit/transform/predict/score API). This
approach provides a nice community interface between parallelism and machine
learning developers.

However, this training is inherently sequential because the model only trains
on one batch of data at a time. We're leaving a lot of processing power on the
table.

To address this we can combine incremental training with hyper-parameter
selection and train several models on the same data at the same time. This is
often required anyway, and lets us be more efficient with our computation.

However there are many ways to do incremental training with hyper-parameter
selection, and the right algorithm likely depends on the problem at hand.
This is an active field of research and so it's hard for a general project like
Dask to pick and implement a single method that works well for everyone. There
is probably a handful of methods that will be necessary with various options on
them.

To help experimentation here we've been experimenting with some lower-level
tooling that we think will be helpful in a variety of cases. This accepts a
policy from the user as a Python function that gets scores from recent
evaluations, and asks for how much further to progress on each set of
hyper-parameters before checking in again. This allows us to model a few
common situations like random search with early stopping conditions, successive
halving, and variations of those easily without having to write any Dask code:

- [dask/dask-ml #288](https://github.com/dask/dask-ml/pull/288)
- [Notebook showing a few approaches](https://gist.github.com/mrocklin/4c95bd26d15281d82e0bf2d27632e294)
- [Another notebook showing convergence](https://gist.github.com/stsievert/c675b3a237a60efbd01dcb112e29115b)

This work is done by [Scott Sievert](http://github.com/stsievert) and myself

<img src="https://user-images.githubusercontent.com/1320475/43540881-7184496a-95b8-11e8-975a-96c2f17ee269.png"
     width="70%"
     alt="Successive halving and random search">

### Parameter Servers

To improve the speed of training large models [Scott
Sievert](https://github.com/stsievert) has been using Actors (mentioned above)
to develop simple examples for parameter servers. These are helping to
identify and motivate performance and diagnostic improvements improvements
within Dask itself:

<script src="https://gist.github.com/ff8a1df9300a82f15a2704e913469522.js"></script>

These parameter servers manage the communication of models produced by
different workers, and leave the computation to the underlying deep learning
library. This is ongoing work.

### Dataframe Preprocessing Transformers

We've started to orient some of the Dask-ML work around case studies. Our
first, written by [Scott Sievert](https://github.com/stsievert), uses the
Criteo dataset for ads. It's a good example of a combined dense/sparse dataset
that can be somewhat large (around 1TB). The first challenge we're running
into is preprocessing. These have lead to a few preprocessing improvements:

- [Label Encoder supports Pandas Categorical dask/dask-ml #310](https://github.com/dask/dask-ml/pull/310)
- [Add Imputer with mean and median strategies dask/dask-ml #11](https://github.com/dask/dask-ml/pull/11)
- [Ad OneHotEncoder dask/dask-ml #313](https://github.com/dask/dask-ml/pull/313)
- [Add Hashing Vectorizer dask/dask-ml #122](https://github.com/dask/dask-ml/pull/122)
- [Add ColumnTransformer dask/dask-ml #315](https://github.com/dask/dask-ml/pull/315)

Some of these are also based off of improved dataframe handling features in the
upcoming 0.20 release for Scikit-Learn.

This work is done by
[Roman Yurchak](https://github.com/dask/dask-ml/pull/122),
[James Bourbeau](https://github.com/jrbourbeau),
[Daniel Severo](https://github.com/daniel-severo), and
[Tom Augspurger](https://github.com/TomAugspurger).

### Profiling the main thread

Profiling concurrent code is hard. Traditional profilers like CProfile become
confused by passing control between all of the different coroutines. This
means that we haven't done a very comprehensive job of profiling and tuning the
distributed scheduler and workers. Statistical profilers on the other hand
tend to do a bit better. We've taken the statistical profiler that we usually
use on Dask worker threads (available in the dashboard on the "Profile" tab)
and have applied it to the central administrative threads running the Tornado
event loop as well. This has highlighted a few issues that we weren't able to
spot before, and should hopefully result in reduced overhead in future
releases.

- [dask/distributed #2144](https://github.com/dask/distributed/pull/2144)
- [stackoverflow.com/questions/51582394/which-functions-are-free-when-profiling-tornado-asyncio](https://stackoverflow.com/questions/51582394/which-functions-are-free-when-profiling-tornado-asyncio)

<img src="https://user-images.githubusercontent.com/306380/43368136-4574f46c-930d-11e8-9d5b-6f4b4f6aeffe.png"
     width="70%"
     alt="Profile of event loop thread">

### New release of Dask-Yarn

There is a new release of [Dask-Yarn](http://dask-yarn.readthedocs.io/en/latest)
and the underlying library for managing Yarn jobs,
[Skein](https://jcrist.github.io/skein/). These include a number of bug-fixes
and improved concurrency primitives for YARN applications. The new features are
documented [here](https://jcrist.github.io/skein/key-value-store.html), and were
implemented in [jcrist/skein #40](https://github.com/jcrist/skein/pull/40).

This work was done by [Jim Crist](https://jcrist.github.io/)

### Support for LSF clusters in Dask-Jobqueue

[Dask-jobqueue](http://dask-jobqueue.readthedocs.io/en/latest/) supports Dask
use on traditional HPC cluster managers like SGE, SLURM, PBS, and others.
We've recently [added support for LSF clusters](http://dask-jobqueue.readthedocs.io/en/latest/generated/dask_jobqueue.LSFCluster.html#dask_jobqueue.LSFCluster)

Work was done in [dask/dask-jobqueue #78](https://github.com/dask/dask-jobqueue/pull/78) by [Ray Bell](https://github.com/raybellwaves).

### New Dask Story on mobile networks

The [Dask Stories](http://dask-stories.readthedocs.io/en/latest/)
repository holds narrative about how people use Dask.
[Sameer Lalwani](https://www.linkedin.com/in/lalwanisameer/)
recently added a story about using Dask to
[model mobile communication networks](http://dask-stories.readthedocs.io/en/latest/network-modeling.html).
It's worth a read.

### Test suite cleanup

The dask.distributed test suite has been suffering from intermittent failures
recently. These are tests that fail very infrequently, and so are hard to
catch when writing them, but show up when future unrelated PRs run the test
suite on continuous integration and get failures. They add friction to the
development process, but are expensive to track down (testing distributed
systems is hard).

We're taking a bit of time this week to track these down. Progress here:

- [dask/distributed #2146](https://github.com/dask/distributed/pull/2146)
- [dask/distributed #2152](https://github.com/dask/distributed/pull/2152)
