'How to properly define and use a schema in Apache Beam Python?

The following code attempts to read data from a source, define a schema and perform a SQLTransform.

...

class RowSchema(typing.NamedTuple):
    colA: str
    colB: typing.Optional[str]
    
beam.coders.registry.register_coder(RowSchema, beam.coders.RowCoder)

def run(argv=None):
  ...
  with beam.Pipeline(options=pipeline_options) as p:
    query = '''
    SELECT
      colA, colB
    FROM `{}`
    ''' \
      .format(
        known_args.table
      )

    pcol = (p 
    | 'read from BQ' >>
     beam.io.ReadFromBigQuery(
      gcs_location=known_args.execution_gcs_location,
      query=query,
      use_standard_sql=True,
      )
    | 'ToRow' >> beam.Map(
      lambda x: RowSchema(**x)).with_output_types(RowSchema)
    | SqlTransform(
        """
        ...
        """)
    | beam.Map(print)
    )

...

However, it results in the following error:

  File "/home/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 423, in encode
    return value.encode('utf-8')
AttributeError: 'int' object has no attribute 'encode' [while running 'ToRow']

Using the same structure, the following pipeline works correctly:

pcol = (p
    | "Create" >> beam.Create(
        [{'colA': 'a1', 'colB': 'b1'}, {'colA': 'a2', 'colB': None}])
    | 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
    | SqlTransform(
        """
        ...
        """)
    | beam.Map(print)
    )

What seems to be missing in example 1 is converting the input data into an instance of beam.pvalue.Row, which is the case in example 2 but not in example 1.

How can I convert the input into Row objects for use with a static schema, assuming that was actually the problem?

The structure used is based on the following references: 1 2

The bigquery module also has built-in schemas, but only for BigQuery writes.

I've also checked the examples including this one which uses dynamic schemas that wouldn't work for this use case.



Solution 1:[1]

Turns out colA was actually an INTEGER in BigQuery, so simply changing its type to int in the schema definition solves the problem.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Nivaldo T