Summary#
We’ve sufficiently aligned Dask DataFrame and cuDF to get groupby aggregations like the following to work well.
df.groupby('x').y.mean()
This post describes the kind of work we had to do as a model for future development.
Plan#
As outlined in a previous post, Dask, Pandas, and GPUs: first steps, our plan to produce distributed GPU dataframes was to combine Dask DataFrame with cudf. In particular, we had to
change Dask DataFrame so that it would parallelize not just around the Pandas DataFrames that it works with today, but around anything that looked enough like a Pandas DataFrame
change cuDF so that it would look enough like a Pandas DataFrame to fit within the algorithms in Dask DataFrame
Changes#
On the Dask side this mostly meant replacing
Replacing
isinstance(df, pd.DataFrame)checks withis_dataframe_like(df)checks (after defining a suitableis_dataframe_like/is_series_like/is_index_likefunctionsAvoiding some more exotic functionality in Pandas, and instead trying to use more common functionality that we can expect to be in most DataFrame implementations
On the cuDF side this means making dozens of tiny changes to align the cuDF API to the Pandas API, and to add in missing features.
Dask Changes:
Remove explicit pandas checks and provide cudf lazy registration #4359
Lazily register more cudf functions and move to backends file #4396
Avoid groupby.agg(callable) in groupby-var #4482 – this one is notable in that by simplifying our Pandas usage we actually got a significant speedup on the Pandas side.
cuDF Changes:
I don’t really expect anyone to go through all of those issues, but my hope is that by skimming over the issue titles people will get a sense for the kinds of changes we’re making here. It’s a large number of small things.
Also, kudos to Thomson Comer who solved most of the cuDF issues above.
There are still some pending issues#
Square Root #1055, needed for groupby-std
cuDF needs multi-index support for columns #483, needed for:
gropuby.agg({'x': ['sum', mean'], 'y': ['min', 'max']})
But things mostly work#
But generally things work pretty well today:
In [1]: import dask_cudf
In [2]: df = dask_cudf.read_csv('yellow_tripdata_2016-*.csv')
In [3]: df.groupby('passenger_count').trip_distance.mean().compute()
Out[3]: <cudf.Series nrows=10 >
In [4]: _.to_pandas()
Out[4]:
0 0.625424
1 4.976895
2 4.470014
3 5.955262
4 4.328076
5 3.079661
6 2.998077
7 3.147452
8 5.165570
9 5.916169
dtype: float64
Experience#
First, most of this work was handled by the cuDF developers (which may be evident from the relative lengths of the issue lists above). When we started this process it felt like a never-ending stream of tiny issues. We weren’t able to see the next set of issues until we had finished the current set. Fortunately, most of them were pretty easy to fix. Additionally, as we went on, it seemed to get a bit easier over time.
Additionally, lots of things work other than groupby-aggregations as a result of the changes above. From the perspective of someone accustomed to Pandas, The cuDF library is starting to feel more reliable. We hit missing functionality less frequently when using cuDF on other operations.
What’s next?#
More recently we’ve been working on the various join/merge operations in Dask DataFrame like indexed joins on a sorted column, joins between large and small dataframes (a common special case) and so on. Getting these algorithms from the mainline Dask DataFrame codebase to work with cuDF is resulting in a similar set of issues to what we saw above with groupby-aggregations, but so far the list is much smaller. We hope that this is a trend as we continue on to other sets of functionality into the future like I/O, time-series operations, rolling windows, and so on.
Comments
comments powered by Disqus