'list indices must be integers or slices, not str error in dataflow pipeline

I'm running the below pipeline to ingest the data to BQ from GCS.

import argparse
import logging
import re
import json

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.coders.coders import Coder
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json

# schema_data = json.dumps(json.load(open("myschema.json")))
# table_schema = parse_table_schema_from_json(schema_data)
table_schema = parse_table_schema_from_json(json.dumps(json.load(open("myschema.json"))["schema"]))
# table_schema = parse_table_schema_from_json(json.load(open("myschema.json"))["schema"])

class DataIngestion:
    """A helper class which contains the logic to translate the file into
    a format BigQuery will accept."""

    def parse_method(self, string_input):
        
        values = re.split(",", re.sub('\r\n', '', re.sub('"', '',
                                                         string_input)))
        row = dict(
            zip(('abv', 'ibu', 'id', 'name', 'style', 'brewery_id','ounces'),
                values))
        return row


        
def run(argv=None):
    """The main function which creates the pipeline and runs it."""

    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to read. This can be a local file or '
        'a file in a Google Storage Bucket.',
        default='gs://dfdfd/bee.csv')

    parser.add_argument('--output',
                        dest='output',
                        required=False,
                        help='Output BQ table to write results to.',
                        default='dataset.table')

    known_args, pipeline_args = parser.parse_known_args(argv)

    data_ingestion = DataIngestion()

    p = beam.Pipeline(options=PipelineOptions(pipeline_args))

    (p
    | 'Read from a File' >> beam.io.ReadFromText(known_args.input,skip_header_lines=1)
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s))
    | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
             known_args.output,
             schema=table_schema,
             # Creates the table in BigQuery if it does not yet exist.
             create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
             # Deletes all data in the BigQuery table before writing.
             write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    p.run().wait_until_finish()


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

I am getting error list indices must be integers or slices, not str at the line

parse_table_schema_from_json(json.dumps(json.load(open("myschema.json"))["schema"]))

i have schemafile as json and trying to use the json as schema in beam code to create a table if not already present else load the data to biqquery



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source