'why creating a beam dataframe from beam.rows do not work but it does from beam.select

I have simplified my problem with the following two cases:

case 1:

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe

input_file = "gs://bucket/inputfile.parquet"

p = beam.Pipeline()

updates = (p
           | 'Read updates file %s' % (input_file) >> beam.io.ReadFromParquet(input_file)
           | 'To rows' >> beam.Map(lambda row: beam.Row(**row))
           )

df = to_dataframe(updates)
size = df.groupby("year").size()

p.run()

case 2:

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe

input_file = "gs://bucket/inputfile.parquet"

p = beam.Pipeline()

updates = (p
           | 'Read updates file %s' % (input_file) >> beam.io.ReadFromParquet(input_file)
           | 'To rows' >> beam.Select(id=lambda item: item["id"], area=lambda item: item["area"], year=lambda item: item["year"])
           )

df = to_dataframe(updates)
size = df.groupby("year").size()

p.run()

The only difference is how i create the rows. Why the second case works but the first one doesn't ???
According to this, they should be equivalent.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source