'Pubsub to BQ load using beam

I am trying to create beam code which will just load the data from pubsub to bq, when i am running the same code from colab its working perfectly but when i am creating template or dataflow as running i am getting issues not define etc.

Error = Invalid JSON payload received. Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list. [while running 'Write Raw Data to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-60']

line 179, in process UnboundLocalError: local variable 'job' referenced before assignment [while running 'Write Raw Data to Big Query-ptransform-63'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:922

Code :

            import argparse
            import json
            import os
            import logging
            import pandas as pd
            import apache_beam as beam
            from google.cloud import bigquery, pubsub_v1
            from google.cloud.bigquery import table
            import google.cloud.storage.client as gcs
            from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions, SetupOptions
            import apache_beam.transforms.window as window


            serviceAccount = '/content/test-project-0c3a740e58c8.json'
            os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount

            logging.basicConfig(level=logging.INFO)
            logging.getLogger().setLevel(logging.INFO)

            project_id = "test-project"
            bigquery_dataset = "bq_load"
            bigquery_message_table = "message_table"
            bigquery_error_table = "Error_Table"
            bigquery_message_agg_table = "message_table_aggri"
            subscription_id = "topic_message-sub"

            INPUT_SUBSCRIPTION = pubsub_v1.SubscriberClient().subscription_path(project_id, subscription_id)
            WINDOW_SIZE = 100

            client = bigquery.Client(project = project_id)
            bq_dataset  = client.dataset(bigquery_dataset)
            bq_msg_table = bq_dataset.table(bigquery_message_table)
            bq_err_table = bq_dataset.table(bigquery_error_table)
            bq_agg_table = bq_dataset.table(bigquery_message_agg_table)

            message_table_schema = {
                      'name': 'MessageType',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'Location',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'Building',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'DateTime',
                      'type': 'DATETIME',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'ID',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'TID',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }

            error_table_schema = {
                      'name': 'Message',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'Error',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }

            table_schema_agri = {
                      'name': 'MessageType',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'Location',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'Building',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'ID',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'TID',
                      'type': 'STRING',
                      'mode': 'NULLABLE'
                      }, {
                      'name': 'no_of_rows',
                      'type': 'NUMERIC',
                      'mode': 'NULLABLE'
                      }

            def format_schema(schema):
              formatted_schema = []
              column_list = []
              for row in schema:
                  formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))
                  column_list.append(row['name'])
              return formatted_schema, set(column_list)

            message_job_config = bigquery.LoadJobConfig()
            message_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
            message_job_config.schema, message_column_list = format_schema(message_table_schema)
            print(message_column_list)

            error_job_config = bigquery.LoadJobConfig()
            error_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
            error_job_config.schema, error_column_list = format_schema(error_table_schema)
            print(error_column_list)

            agg_job_config = bigquery.LoadJobConfig()
            agg_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
            agg_job_config.schema, agg_column_list = format_schema(table_schema_agri)
            print(agg_column_list)

            class ProcessMessage(beam.DoFn):

                def process(self, message):
                  json_object = []
                  try:
                    message_str = message.decode("utf-8")
                    json_message = eval(message_str)

                    if isinstance(json_message, dict) and set(message_column_list).issubset(json_message.keys()):
                      print('{}'.format(("Formatted message", json_message)))
                      yield tuple(["Formatted message", json_message])
                    else:
                      print('{}'.format(("Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'})))
                      yield tuple(["Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'}])
                  except Exception as e:
                      print('{}'.format(("Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)})))
                      yield tuple(["Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)}])

            class WriteDataframeToBQ(beam.DoFn):
                
                def process(self, message):
                  global project_id 
                  global bigquery_dataset
                  global bigquery_message_table
                  global bigquery_error_table
                  global bigquery_message_agg_table
                  global subscription_id
                  global client
                  global bq_dataset
                  global bq_msg_table
                  global bq_err_table
                  global bq_agg_table
                  if message[0] == 'Formatted message':
                    try:
                      print('Now inserting:', message[1])
                      job = client.load_table_from_json(message[1], BQ_MSG_TABLE, job_config = message_job_config)
                      job.result()  # Waits for the job to complete.
                      print('Total {} messages successfully written to BigQuery Messsage Table'.format(len(message[1])))
                      
                      # Creating aggregation dataframe 
                      df = pd.DataFrame(message[1]) 
                      count_df= df.groupby(['MessageType','Location','Building','ID','TID']).size().reset_index(name='no_of_rows')
                      job = client.load_table_from_json(count_df.to_dict('records'), BQ_AGG_TABLE, job_config = agg_job_config)
                      job.result()  # Waits for the job to complete.
                      print('Total {} messages successfully written to BigQuery Agree Messsage Table'.format(str(count_df.shape[0])))
                     
                    except Exception as e:
                     # print(job.errors)
                      raise e
                  else:
                    
                    try:          
                      error_rows = [{"Message": json.dumps(m["Original_Message"]) if isinstance(m["Original_Message"], dict) else str(m["Original_Message"]), "Error": m["Error"]} for m in message[1]]
                      print('Now inserting:', error_rows)
                      job = client.load_table_from_json(error_rows, BQ_ERR_TABLE, job_config = error_job_config)
                      job.result()  # Waits for the job to complete.
                      print('Total {} messages successfully written to BigQuery Error Table'.format(len(error_rows)))
                    except Exception as e:
                      #print(job.errors)
                      raise e       

            def run():  


                pipeline_options = {
                  'project': 'test-project',        
                  'runner': 'DataflowRunner',
                  'region': 'us-east1',
                  'staging_location': 'gs://bucket12345/tmp/',
                  'temp_location': 'gs://bucket12345/tmp/',
                  'template_location': 'gs://bucket12345/template/dataflow_test_template',
                  
                   'streaming': True
                  }
                pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
                    table = 'test-project:bq_load.message_table'

                with beam.Pipeline(options=pipeline_options) as p:
                   (     p
                      | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
                          subscription=INPUT_SUBSCRIPTION, timestamp_attribute=None
                      )
                      | "Process Message" >> beam.ParDo(ProcessMessage())
                      | 'Fixed-size windows' >> beam.WindowInto(window.FixedWindows(WINDOW_SIZE))
                      | beam.GroupByKey()
                      | "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ())
                      | "Write Raw Data to BQ" >> beam.io.WriteToBigQuery(
                          table,custom_gcs_temp_location = 'gs://bucket12345/template/dataflow_test_template')
                  )

            run()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source