'How to set output_sizes and output_dtypes when setting dask='parallelized' in apply_ufunc from xarrays?
I'm working with xarrays and trying to apply a function (Phen) along the 'time' dimension:
def Phen(array, dates, nGS):
df = pd.DataFrame(data={'x':dates, 'y':array})
df['x'] = pd.to_datetime(df['x'], format='%Y-%m-%d')
df['doy'] = df['x'].dt.dayofyear
df['y2'] = df['y'].interpolate()
grouped = df.groupby('doy').mean()['y2']
x = grouped.index.values
y = grouped.values
xi = np.arange(1,366, round(365/nGS))
interpolator = interpolate.interp1d(x, y)
yi = interpolator(xi)
return yi
The function outputs an array of shape (nGS,), being nGS an integer (nGS = 22).
I have a subset of my xarray (named subset) with the following characteristics
[1] subset = data.load()
[2] subset
xarray.DataArray time: 191 y: 3 x: 3
array([[[0.10421417, 0.09748246, 0.07814603],
[0.0791336 , 0.0755756 , 0.08128342],
[0.05619302, 0.06154257, 0.07437155]],
[[ nan, nan, nan],
[ nan, nan, nan],
[ nan, nan, nan]],
...,
[[0.10877109, 0.10753239, 0.0845692 ],
[0.08107001, 0.06364887, 0.08600463],
[0.06155076, 0.07387387, 0.07401033]],
[[0.11185646, 0.10244605, 0.082422 ],
[0.07255252, 0.07440122, 0.08029867],
[0.06888834, 0.07161374, 0.0796656 ]]])
Coordinates:
time (time) datetime64[ns] 2013-04-12T14:35:27.984873 ... 2...
y (y) float64 6.325e+06 6.325e+06 6.325e+06
x (x) float64 3.136e+05 3.136e+05 3.136e+05
spatial_ref () int32 32719
Then, I'm running apply_ufunc to execute the function along the 'time' dimension:
output = xr.apply_ufunc(Phen,
subset,
subset.time,
nGS,
input_core_dims=[['time'], ['time'], []],
output_core_dims=[['new']],
exclude_dims=set(("time",)),
vectorize=True)
which leads to:
[1] output
xarray.DataArray y: 3 x: 3 new: 22
array([[[0.09018036, 0.09018036, 0.09218437, 0.09218437, 0.09418838,
0.09418838, 0.09619238, 0.09819639, 0.10220441, 0.10420842,
0.10621242, 0.10621242, 0.10621242, 0.10821643, 0.10821643,
0.10821643, 0.10821643, 0.10621242, 0.10220441, 0.10220441,
0.10220441, 0.10220441],
[0.07815631, 0.08016032, 0.08016032, 0.08216433, 0.08416834,
0.08617234, 0.08817635, 0.09218437, 0.09619238, 0.1002004 ,
0.1002004 , 0.10220441, 0.10220441, 0.10220441, 0.10220441,
0.10220441, 0.1002004 , 0.1002004 , 0.09819639, 0.09418838,
0.09218437, 0.09218437],
[0.06613226, 0.06613226, 0.06813627, 0.06813627, 0.07014028,
0.07214429, 0.0761523 , 0.07815631, 0.08817635, 0.08817635,
0.09018036, 0.09018036, 0.08817635, 0.08817635, 0.08817635,
0.08617234, 0.08416834, 0.08216433, 0.08216433, 0.08016032,
0.08016032, 0.07815631]],
...
[[0.03006012, 0.03006012, 0.03006012, 0.03006012, 0.05210421,
0.05611222, 0.05811623, 0.05811623, 0.06012024, 0.06012024,
0.06012024, 0.06212425, 0.06212425, 0.06212425, 0.06012024,
0.06012024, 0.05811623, 0.05811623, 0.05611222, 0.05410822,
0.04208417, 0.04008016],
[0.04609218, 0.04809619, 0.06212425, 0.06212425, 0.06212425,
0.06212425, 0.06412826, 0.07014028, 0.07214429, 0.0741483 ,
0.0741483 , 0.0741483 , 0.0741483 , 0.07214429, 0.07214429,
0.07014028, 0.07014028, 0.06813627, 0.06012024, 0.06012024,
0.05811623, 0.05611222],
[0.06412826, 0.06412826, 0.06412826, 0.06613226, 0.06613226,
0.06813627, 0.07014028, 0.07214429, 0.0761523 , 0.07815631,
0.08016032, 0.08016032, 0.08016032, 0.07815631, 0.07815631,
0.0761523 , 0.0761523 , 0.0741483 , 0.0741483 , 0.0741483 ,
0.0741483 , 0.06813627]]])
Coordinates:
y (y) float64 6.325e+06 6.325e+06 6.325e+06
x (x) float64 3.136e+05 3.136e+05 3.136e+05
spatial_ref () int32 32719
The output xarray object is correct.
However, I'm trying to parallelize this splitting my subset in chunks
[] chunks = data.chunk({'x':1, 'y':1, 'time':len(data.time)})
[] chunks
xarray.DataArray time: 191 y: 3 x: 3
Array Chunk
Bytes 13.43 kiB 1.49 kiB
Shape (191, 3, 3) (191, 1, 1)
Count 3075 Tasks 9 Chunks
Type float64 numpy.ndarray
but when I run the apply_ufunc setting dask='parallelized' I keep getting errors associated with the output_size and the output_dtype arguments:
output2 = xr.apply_ufunc(Phen,
chunks,
chunks.time,
nGS,
input_core_dims=[['time'], ['time'], []],
output_core_dims=[['new']],
exclude_dims=set(("time",)),
vectorize=True,
dask='parallelized',
output_sizes={'dim':1, 'size':22},
output_dtypes=[float])
which raises the following error:
ValueError: dimension 'dim' in 'output_sizes' must correspond to output_core_dims
I've modified 'dim' and 'size' passing integers and lists with integers, but these don't fix the problem.
I would be deeply grateful if anyone has a clue on how to fix this issue.
Regards
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
