'Dask division issue after groupby

I am working on a project where I need to group by several columns depending on the task and I have unknown division issues with dask because of this.

Here is a sample of the problem


import pandas as pd
import dask.dataframe as dd 
import numpy as np



df = pd.DataFrame({"col1": np.random.randint(1, 100, 100000), "col2": np.random.randint(101, 200, 100000), "col3": np.random.uniform(0, 4, 100000)})

ddf = dd.from_pandas(df, npartitions=100)

ddf = ddf.set_index("col1")

ddf["col2_sum"] = ddf.groupby("col1")["col3"].transform("sum", meta=('x', 'float64')) # works
print(ddf.compute())

This works because I am grouping by an indexed column. However,


ddf["col2_sum2"] = ddf.groupby("col2")["col3"].transform("sum", meta=('x', 'float64'))

This doesn't work because of ValueError: Not all divisions are known, can't align partitions. Please use `set_index` to set the index.

I have tried to solve this this way


ddf_new = ddf[["col2", "col3"]].set_index("col2")

ddf_new["col2_sum2"] = ddf_new.groupby("col2")["col3"].transform("sum", meta=('x', 'float64'))

ddf_new = ddf_new.drop(columns=["col3"])

ddf = ddf.merge(ddf_new, on=["col2"], how="outer") # works but expensive round trip

print(ddf.compute())

But this is very expensive dask merges. Is there a better way of solving this problem



Solution 1:[1]

The solution you created seems reasonable, I would make one improvement (if this is feasible with actual data): if ddf_new is computed, then it becomes a pandas df, so the merge of ddf and ddf_new becomes a lot faster as there is less data to shuffle around.

Update: also to avoid sending the pandas df from workers to client and back, you could do a ddf_new = client.compute(ddf_new) and pass around just the future (reference to the computed pandas df).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 SultanOrazbayev