This work is supported by Continuum Analytics the XDATA Program and the Data Driven Discovery Initiative from the Moore Foundation
Summary#
Dask just released version 0.14.0. This release contains some significant internal changes as well as the usual set of increased API coverage and bug fixes. This blogpost outlines some of the major changes since the last release January, 27th 2017.
Structural sharing of graphs between collections
Refactor communications system
Many small dataframe improvements
Top-level persist function
You can install new versions using Conda or Pip
conda install -c conda-forge dask distributed
or
pip install dask[complete] distributed --upgrade
Communications System#
Dask communicates over TCP sockets. It uses Tornado’s IOStreams to handle non-blocking communication, framing, etc.. We’ve run into some performance issues with Tornado when moving large amounts of data. Some of this has been improved upstream in Tornado directly, but we still want the ability to optionally drop Tornado’s byte-handling communication stack in the future. This is especially important as dask gets used in institutions with faster and more exotic interconnects (supercomputers). We’ve been asked a few times to support other transport mechanisms like MPI.
The first step (and probably hardest step) was to make Dask’s communication system is pluggable so that we can use different communication options without significant source-code changes. We managed this a month ago and now it is possible to add other transports to Dask relatively easily. TCP remains the only real choice today though there is also an experimental ZeroMQ option (which provides little-to-no performance benefit over TCP) as well as a fully in-memory option in development.
For users the main difference you’ll see is that tcp:// is now prepended many
places. For example:
$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO - Scheduler at: tcp://192.168.1.115:8786
...
Variety of Dataframe Changes#
As usual the Pandas API has been more fully covered by community contributors. Some representative changes include the following:
Support non-uniform categoricals: We no longer need to do a full pass through the data when categorizing a column. Instead we categorize each partition independently (even if they have different category values) and then unify these categories only when necessary
df['x'] = df['x'].astype('category') # this is now fast
Groupby cumulative reductions
df.groupby('x').cumsum()
Support appending to Parquet collections
df.to_parquet('/path/to/foo.parquet', append=True)
A new string and HTML representation of dask.dataframes. Typically Pandas prints dataframes on the screen by rendering the first few rows of data. However, Because Dask.dataframes are lazy we don’t have this data and so typically render some metadata about the dataframe
>>> df # version 0.13.0 dd.DataFrame<make-ti..., npartitions=366, divisions=(Timestamp('2000-01-01 00:00:00', freq='D'), Timestamp('2000-01-02 00:00:00', freq='D'), Timestamp('2000-01-03 00:00:00', freq='D'), ..., Timestamp('2000-12-31 00:00:00', freq='D'), Timestamp('2001-01-01 00:00:00', freq='D'))>
This rendering, while informative, can be improved. Now we render dataframes as a Pandas dataframe, but place metadata in the dataframe instead of the actual data.
>>> df # version 0.14.0 Dask DataFrame Structure: x y z npartitions=366 2000-01-01 float64 float64 int64 2000-01-02 ... ... ... ... ... ... ... 2000-12-31 ... ... ... 2001-01-01 ... ... ... Dask Name: make-timeseries, 366 tasks
Additionally this renders nicely as an HTML table in a Jupyter notebook
Variety of Distributed System Changes#
There have also been a wide variety of changes to the distributed system. I’ll include a representative sample here to give a flavor of what has been happening:
Ensure first-come-first-served priorities when dealing with multiple clients
Send small amounts of data through Channels. Channels are a way for multiple clients/users connected to the same scheduler to publish and exchange data between themselves. Previously they only transmitted Futures (which could in trun point to larger data living on the cluster). However we found that it was useful to communicate small bits of metadata as well, for example to signal progress or stopping critera between clients collaborating on the same workloads. Now you can publish any msgpack serializable data on Channels.
# Publishing Client scores = client.channel('scores') scores.append(123.456) # Subscribing Client scores = client.channel('scores') while scores.data[-1] < THRESHOLD: ... continue working ...
We’re better at estimating the size in data of SciPy Sparse matrices and Keras models. This allows Dask to make smarter choices about when it should and should not move data around for load balancing. Additionally Dask can now also serialize Keras models.
To help people deploying on clusters that have a shared network file system (as is often the case in scientific or academic institutions) the scheduler and workers can now communicate connection information using the
--scheduler-filekeyworddask-scheduler --scheduler-file /path/to/scheduler.json dask-worker --scheduler-file /path/to/scheduler.json dask-worker --scheduler-file /path/to/scheduler.json >>> client = Client(scheduler_file='/path/to/scheudler.json')
Previously we needed to communicate the address of the scheduler, which could be challenging when we didn’t know on which node the scheduler would be run.
Other#
There are a number of smaller details not mentioned in this blogpost. For more information visit the changelogs and documentation
Additionally a great deal of Dask work over the last month has happened outside of these core dask repositories.
You can install or upgrade using Conda or Pip
conda install -c conda-forge dask distributed
or
pip install dask[complete] distributed --upgrade
Acknowledgements#
Since the last 0.13.0 release on January 27th the following developers have contributed to the dask/dask repository:
Antoine Pitrou
Chris Barber
Daniel Davis
Elmar Ritsch
Erik Welch
jakirkham
Jim Crist
John Crickett
jspreston
Juan Luis Cano Rodríguez
kayibal
Kevin Ernst
Markus Gonser
Matthew Rocklin
Martin Durant
Nir
Sinhrks
Talmaj Marinc
Vlad Frolov
Will Warner
And the following developers have contributed to the dask/distributed repository:
Antoine Pitrou
Ben Schreck
bmaisonn
Brett Naul
Demian Wassermann
Israel Saeta Pérez
John Crickett
Joseph Crail
Malte Gerken
Martin Durant
Matthew Rocklin
Min RK
strets123
Comments
comments powered by Disqus