'Dask division issue after groupby
I am working on a project where I need to group by several columns depending on the task and I have unknown division issues with dask because of this.
Here is a sample of the problem
import pandas as pd
import dask.dataframe as dd
import numpy as np
df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000), "col2": np.random.randint(101, 200, 100000), "col3": np.random.uniform(0, 4, 100000)})
ddf = dd.from_pandas(df, npartitions=100)
ddf = ddf.set_index("col1")
ddf["col2_sum"] = ddf.groupby("col1")["col3"].transform("sum", meta=('x', 'float64')) # works
print(ddf.compute())
This works because I am grouping by an indexed column. However,
ddf["col2_sum2"] = ddf.groupby("col2")["col3"].transform("sum", meta=('x', 'float64'))
This doesn't work because of ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.
I have tried to solve this this way
ddf_new = ddf[["col2", "col3"]].set_index("col2")
ddf_new["col2_sum2"] = ddf_new.groupby("col2")["col3"].transform("sum", meta=('x', 'float64'))
ddf_new = ddf_new.drop(columns=["col3"])
ddf = ddf.merge(ddf_new, on=["col2"], how="outer") # works but expensive round trip
print(ddf.compute())
But this is very expensive dask merges. Is there a better way of solving this problem
Solution 1:[1]
The solution you created seems reasonable, I would make one improvement (if this is feasible with actual data): if ddf_new is computed, then it becomes a pandas df, so the merge of ddf and ddf_new becomes a lot faster as there is less data to shuffle around.
Update: also to avoid sending the pandas df from workers to client and back, you could do a ddf_new = client.compute(ddf_new) and pass around just the future (reference to the computed pandas df).
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | SultanOrazbayev |
