'How to set output_sizes and output_dtypes when setting dask='parallelized' in apply_ufunc from xarrays?

I'm working with xarrays and trying to apply a function (Phen) along the 'time' dimension:

def Phen(array, dates, nGS):
    df = pd.DataFrame(data={'x':dates, 'y':array})
    df['x'] = pd.to_datetime(df['x'], format='%Y-%m-%d')
    df['doy'] = df['x'].dt.dayofyear
    df['y2'] = df['y'].interpolate()
    grouped = df.groupby('doy').mean()['y2']
    x = grouped.index.values
    y = grouped.values
    xi = np.arange(1,366, round(365/nGS))
    interpolator = interpolate.interp1d(x, y)
    yi = interpolator(xi)
    return yi

The function outputs an array of shape (nGS,), being nGS an integer (nGS = 22).

I have a subset of my xarray (named subset) with the following characteristics

[1] subset = data.load()
[2] subset
xarray.DataArray   time: 191  y: 3  x: 3
array([[[0.10421417, 0.09748246, 0.07814603],
        [0.0791336 , 0.0755756 , 0.08128342],
        [0.05619302, 0.06154257, 0.07437155]],

       [[       nan,        nan,        nan],
        [       nan,        nan,        nan],
        [       nan,        nan,        nan]],

       ...,

       [[0.10877109, 0.10753239, 0.0845692 ],
        [0.08107001, 0.06364887, 0.08600463],
        [0.06155076, 0.07387387, 0.07401033]],

       [[0.11185646, 0.10244605, 0.082422  ],
        [0.07255252, 0.07440122, 0.08029867],
        [0.06888834, 0.07161374, 0.0796656 ]]])
Coordinates:
time         (time)    datetime64[ns]    2013-04-12T14:35:27.984873 ... 2...
y            (y)       float64           6.325e+06 6.325e+06 6.325e+06
x            (x)       float64           3.136e+05 3.136e+05 3.136e+05
spatial_ref  ()        int32             32719

Then, I'm running apply_ufunc to execute the function along the 'time' dimension:

output = xr.apply_ufunc(Phen, 
                        subset, 
                        subset.time,
                        nGS,
                        input_core_dims=[['time'], ['time'], []],
                        output_core_dims=[['new']],
                        exclude_dims=set(("time",)),
                        vectorize=True)

which leads to:

[1] output
xarray.DataArray   y: 3   x: 3   new: 22
array([[[0.09018036, 0.09018036, 0.09218437, 0.09218437, 0.09418838,
         0.09418838, 0.09619238, 0.09819639, 0.10220441, 0.10420842,
         0.10621242, 0.10621242, 0.10621242, 0.10821643, 0.10821643,
         0.10821643, 0.10821643, 0.10621242, 0.10220441, 0.10220441,
         0.10220441, 0.10220441],
        [0.07815631, 0.08016032, 0.08016032, 0.08216433, 0.08416834,
         0.08617234, 0.08817635, 0.09218437, 0.09619238, 0.1002004 ,
         0.1002004 , 0.10220441, 0.10220441, 0.10220441, 0.10220441,
         0.10220441, 0.1002004 , 0.1002004 , 0.09819639, 0.09418838,
         0.09218437, 0.09218437],
        [0.06613226, 0.06613226, 0.06813627, 0.06813627, 0.07014028,
         0.07214429, 0.0761523 , 0.07815631, 0.08817635, 0.08817635,
         0.09018036, 0.09018036, 0.08817635, 0.08817635, 0.08817635,
         0.08617234, 0.08416834, 0.08216433, 0.08216433, 0.08016032,
         0.08016032, 0.07815631]],
...
       [[0.03006012, 0.03006012, 0.03006012, 0.03006012, 0.05210421,
         0.05611222, 0.05811623, 0.05811623, 0.06012024, 0.06012024,
         0.06012024, 0.06212425, 0.06212425, 0.06212425, 0.06012024,
         0.06012024, 0.05811623, 0.05811623, 0.05611222, 0.05410822,
         0.04208417, 0.04008016],
        [0.04609218, 0.04809619, 0.06212425, 0.06212425, 0.06212425,
         0.06212425, 0.06412826, 0.07014028, 0.07214429, 0.0741483 ,
         0.0741483 , 0.0741483 , 0.0741483 , 0.07214429, 0.07214429,
         0.07014028, 0.07014028, 0.06813627, 0.06012024, 0.06012024,
         0.05811623, 0.05611222],
        [0.06412826, 0.06412826, 0.06412826, 0.06613226, 0.06613226,
         0.06813627, 0.07014028, 0.07214429, 0.0761523 , 0.07815631,
         0.08016032, 0.08016032, 0.08016032, 0.07815631, 0.07815631,
         0.0761523 , 0.0761523 , 0.0741483 , 0.0741483 , 0.0741483 ,
         0.0741483 , 0.06813627]]])
Coordinates:
y              (y)    float64     6.325e+06 6.325e+06 6.325e+06
x              (x)    float64     3.136e+05 3.136e+05 3.136e+05
spatial_ref    ()     int32       32719

The output xarray object is correct.

However, I'm trying to parallelize this splitting my subset in chunks

[] chunks = data.chunk({'x':1, 'y':1, 'time':len(data.time)})
[] chunks
xarray.DataArray time: 191  y: 3  x: 3
        Array       Chunk 
Bytes   13.43 kiB   1.49 kiB
Shape   (191, 3, 3) (191, 1, 1)
Count   3075 Tasks  9 Chunks
Type    float64     numpy.ndarray

but when I run the apply_ufunc setting dask='parallelized' I keep getting errors associated with the output_size and the output_dtype arguments:

output2 = xr.apply_ufunc(Phen,
                         chunks,
                         chunks.time,
                         nGS,
                         input_core_dims=[['time'], ['time'], []],
                         output_core_dims=[['new']], 
                         exclude_dims=set(("time",)),
                         vectorize=True,
                         dask='parallelized',
                         output_sizes={'dim':1, 'size':22},
                         output_dtypes=[float])

which raises the following error:

ValueError: dimension 'dim' in 'output_sizes' must correspond to output_core_dims

I've modified 'dim' and 'size' passing integers and lists with integers, but these don't fix the problem.

I would be deeply grateful if anyone has a clue on how to fix this issue.

Regards



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source