This work is supported by Anaconda Inc. and the Data Driven Discovery Initiative from the Moore Foundation.
I’m pleased to announce the release of Dask version 0.17.0. This a significant major release with new features, breaking changes, and stability improvements. This blogpost outlines notable changes since the 0.16.0 release on November 21st.
You can conda install Dask:
conda install dask -c conda-forge
or pip install from PyPI:
pip install dask[complete] --upgrade
Full changelogs are available here:
Some notable changes follow.
Deprecations#
Removed
dask.dataframe.rolling_*methods, which were previously deprecated both in dask.dataframe and in pandas. These are replaced with therolling.*namespaceWe’ve generally stopped maintenance of the
dask-ec2project to launch dask clusters on Amazon’s EC2 using Salt. We generally recommend kubernetes instead both for Amazon’s EC2, and for Google and Azure as wellInternal state of the distributed scheduler has changed significantly. This may affect advanced users who were inspecting this state for debugging or diagnostics.
Task Ordering#
As Dask encounters more complex problems from more domains we continually run into problems where its current heuristics do not perform optimally. This release includes a rewrite of our static task prioritization heuristics. This will improve Dask’s ability to traverse complex computations in a way that keeps memory use low.
To aid debugging we also integrated these heuristics into the GraphViz-style plots that come from the visualize method.
x = da.random.random(...)
...
x.visualize(color='order', cmap='RdBu')
Nested Joblib#
Dask supports parallelizing Scikit-Learn by extending Scikit-Learn’s underlying library for parallelism, Joblib. This allows Dask to distribute some SKLearn algorithms across a cluster just by wrapping them with a context manager.
This relationship has been strengthened,
and particular attention has been focused
when nesting one parallel computation within another,
such as occurs when you train a parallel estimator, like RandomForest,
within another parallel computation, like GridSearchCV.
Previously this would result in spawning too many threads/processes
and generally oversubscribing hardware.
Due to recent combined development within both Joblib and Dask, these sorts of situations can now be resolved efficiently by handing them off to Dask, providing speedups even in single-machine cases:
from sklearn.externals import joblib
import distributed.joblib # register the dask joblib backend
from dask.distributed import Client
client = Client()
est = ParallelEstimator()
gs = GridSearchCV(est)
with joblib.parallel_backend('dask'):
gs.fit()
See Tom Augspurger’s recent post with more details about this work:
Thanks to Tom Augspurger, Jim Crist, and Olivier Grisel who did most of this work.
Scheduler Internal Refactor#
The distributed scheduler has been significantly refactored to change it from a forest of dictionaries:
priority = {'a': 1, 'b': 2, 'c': 3}
dependencies = {'a': {'b'}, 'b': {'c'}, 'c': []}
nbytes = {'a': 1000, 'b': 1000, 'c': 28}
To a bunch of objects:
tasks = {'a': Task('a', priority=1, nbytes=1000, dependencies=...),
'b': Task('b': priority=2, nbytes=1000, dependencies=...),
'c': Task('c': priority=3, nbytes=28, dependencies=[])}
(there is much more state than what is listed above, but hopefully the examples above are clear.)
There were a few motivations for this:
We wanted to try out Cython and PyPy, for which objects like this might be more effective than dictionaries.
We believe that this is probably a bit easier for developers new to the schedulers to understand. The proliferation of state dictionaries was not highly discoverable.
Goal one ended up not working out. We have not yet been able to make the scheduler significantly faster under Cython or PyPy with this new layout. There is even a slight memory increase with these changes. However we have been happy with the results in code readability, and we hope that others find this useful as well.
Thanks to Antoine Pitrou, who did most of the work here.
User Priorities#
You can now submit tasks with different priorities.
x = client.submit(f, 1, priority=10) # Higher priority preferred
y = client.submit(f, 1, priority=-10) # Lower priority happens later
To be clear, Dask has always had priorities, they just weren’t easily user-settable. Higher priorities are given precedence. The default priority for all tasks is zero. You can also submit priorities for collections (like arrays and dataframes)
df = df.persist(priority=5) # give this computation higher priority.
Acknowledgements#
The following people contributed to the dask/dask repository since the 0.16.0 release on November 14th:
Albert DeFusco
Apostolos Vlachopoulos
castalheiro
James Bourbeau
Jon Mease
Ian Hopkinson
Jakub Nowacki
Jim Crist
John A Kirkham
Joseph Lin
Keisuke Fujii
Martijn Arts
Martin Durant
Matthew Rocklin
Markus Gonser
Nir
Rich Signell
Roman Yurchak
S. Andrew Sheppard
sephib
Stephan Hoyer
Tom Augspurger
Uwe L. Korn
Wei Ji
Xander Johnson
The following people contributed to the dask/distributed repository since the 1.20.0 release on November 14th:
Alexander Ford
Antoine Pitrou
Brett Naul
Brian Broll
Bruce Merry
Cornelius Riemenschneider
Daniel Li
Jim Crist
Kelvin Yang
Matthew Rocklin
Min RK
rqx
Russ Bubley
Scott Sievert
Tom Augspurger
Xander Johnson
Comments
comments powered by Disqus