I have been using dask for speeding up some larger scale analyses. Dask is a really great tool for inplace replacement for parallelizing some pyData-powered analyses, such as numpy, pandas and even scikit-learn.
However, I recently found an interesting case where using same syntax in dask.dataframe for pandas.dataframe does not acheive what I want. So in this post, I will document how to overcome it for my future self.
As usual, lets import all the useful libraries:
import pandas as pd
import dask.dataframe as dd
I will use the famous titanic dataset as an example to show that how dask can act weirdly under groupby + apply operations.
titanic = pd.read_csv('http://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv')
titanic.head()
Survived | Pclass | Name | Sex | Age | Siblings/Spouses Aboard | Parents/Children Aboard | Fare | |
---|---|---|---|---|---|---|---|---|
0 | 0 | 3 | Mr. Owen Harris Braund | male | 22.0 | 1 | 0 | 7.2500 |
1 | 1 | 1 | Mrs. John Bradley (Florence Briggs Thayer) Cum... | female | 38.0 | 1 | 0 | 71.2833 |
2 | 1 | 3 | Miss. Laina Heikkinen | female | 26.0 | 0 | 0 | 7.9250 |
3 | 1 | 1 | Mrs. Jacques Heath (Lily May Peel) Futrelle | female | 35.0 | 1 | 0 | 53.1000 |
4 | 0 | 3 | Mr. William Henry Allen | male | 35.0 | 0 | 0 | 8.0500 |
I will illustrate the problem by counting how many survivors in each age and sex group, using the following function:
def count_survival(d):
'''
summarize survivor, and return an dataframe for the single value-ed array
'''
return pd.DataFrame({'survived':[d.Survived.sum()]})
A regular pandas way to do it would be:
titanic \
.groupby(['Age','Sex'])\
.apply(count_survival)\
.head()
survived | |||
---|---|---|---|
Age | Sex | ||
0.42 | male | 0 | 1 |
0.67 | male | 0 | 1 |
0.75 | female | 0 | 2 |
0.83 | male | 0 | 2 |
0.92 | male | 0 | 1 |
Lets translate the pandas.dataframe to a dask.dataframe and do the same
dask_job = titanic \
.pipe(dd.from_pandas, npartitions=24)\
.groupby(['Age','Sex']) \
.apply(count_survival, meta={'survived':'f8'})
This is not going to return any result until we do dask_job.compute()
, but dask also include a visualize function to show the task graph:
dask_job.visualize()
The resultant task graph is much more complicated than I would've expected, and this is actually because data shuffling behind the scene. Suggested by the dask documentation, this issue can be resolved by setting a groupby key as index:
dask_job = titanic \
.set_index('Age')\
.pipe(dd.from_pandas, npartitions=24)\
.groupby(['Age','Sex']) \
.apply(count_survival, meta={'survived':'f8'})
dask_job.visualize()