'MapCoder issue after updating Beam to 2.35
After updating Beam from 2.33 to 2.35, started getting this error:
def estimate_size(self, unused_value, nested=False):
estimate = 4 # 4 bytes for int32 size prefix
> for key, value in unused_value.items():
E AttributeError: 'list' object has no attribute 'items' [while running 'MyGeneric ParDo Job']
../../../python3.8/site-packages/apache_beam/coders/coder_impl.py:677: AttributeError
This is a method of MapCoderImpl. I don't know Beam enough to know when it's called.
Any thoughts on what might be causing it?
Solution 1:[1]
Beam uses Coder to encode and decode a PCollection. From the error message you got, Beam tried to use MapCoder to decode your input data. It expected a dict but received a list instead, hence the error.
Additionally, Beam uses the transform functions' type hints to infer the Coder for output PCollection's elements. My guess is that you might use a wrong return type for your function. Assuming you are implementing a DoFn's process, you yield a list in the function body, then you 'd see the error above if you define the function like this:
def process(self, element, **kwargs) -> List[Dict[A, B]]:
Beam sees the output element's type hint, Dict[A, B], and decides to use MapCoder. You might want to change the type hint to the one below, so that Beam could actually use ListCoder:
def process(self, element, **kwargs) -> List[List[Dict[A, B]]]:
More about benefits of using type hint is describe here.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
