'Error in dataflow job which uses the custom template

I have created the custom template file using following code and it is creating the template successfully in to google cloud storage. But when we try to use that in dataflow job to read data from pubsub and load into Bigquery it throws following error. the code is basically creating the template which will read the data from pubsub and loading the aggregated data, error data and as is data

****Error :
Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 537, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "<ipython-input-9-fee841532e96>", line 192, in process
  File "<ipython-input-9-fee841532e96>", line 187, in process
NameError: name 'client' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
 ython-input-9-fee841532e96>", line 192, in process
  File "<ipython-input-9-fee841532e96>", line 187, in process
NameError: name 'client' is not defined [while running 'Write Raw Data to Big Query-ptransform-63']
passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service_impl.cc:921****




Code :




import argparse
import json
import os
import logging
import pandas as pd
import apache_beam as beam
from google.cloud import bigquery, pubsub_v1
from google.cloud.bigquery import table
import google.cloud.storage.client as gcs
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions, SetupOptions
import apache_beam.transforms.window as window


serviceAccount = '/content/test-project-0c3a740e58c8.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount

logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)

project_id = "test-project"
bigquery_dataset = "bq_load"
bigquery_message_table = "message_table"
bigquery_error_table = "Error_Table"
bigquery_message_agg_table = "message_table_aggri"
subscription_id = "topic_message-sub"

INPUT_SUBSCRIPTION = pubsub_v1.SubscriberClient().subscription_path(project_id, subscription_id)
WINDOW_SIZE = 100

client = bigquery.Client(project = project_id)
bq_dataset  = client.dataset(bigquery_dataset)
bq_msg_table = bq_dataset.table(bigquery_message_table)
bq_err_table = bq_dataset.table(bigquery_error_table)
bq_agg_table = bq_dataset.table(bigquery_message_agg_table)

message_table_schema = {
          'name': 'MessageType',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'Location',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'Building',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'DateTime',
          'type': 'DATETIME',
          'mode': 'NULLABLE'
          }, {
          'name': 'ID',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'TID',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }

error_table_schema = {
          'name': 'Message',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'Error',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }

table_schema_agri = {
          'name': 'MessageType',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'Location',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'Building',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'ID',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'TID',
          'type': 'STRING',
          'mode': 'NULLABLE'
          }, {
          'name': 'no_of_rows',
          'type': 'NUMERIC',
          'mode': 'NULLABLE'
          }

def format_schema(schema):
  formatted_schema = []
  column_list = []
  for row in schema:
      formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))
      column_list.append(row['name'])
  return formatted_schema, set(column_list)

message_job_config = bigquery.LoadJobConfig()
message_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
message_job_config.schema, message_column_list = format_schema(message_table_schema)
print(message_column_list)

error_job_config = bigquery.LoadJobConfig()
error_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
error_job_config.schema, error_column_list = format_schema(error_table_schema)
print(error_column_list)

agg_job_config = bigquery.LoadJobConfig()
agg_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
agg_job_config.schema, agg_column_list = format_schema(table_schema_agri)
print(agg_column_list)

class ProcessMessage(beam.DoFn):

    def process(self, message):
      json_object = []
      try:
        message_str = message.decode("utf-8")
        json_message = eval(message_str)

        if isinstance(json_message, dict) and set(message_column_list).issubset(json_message.keys()):
          print('{}'.format(("Formatted message", json_message)))
          yield tuple(["Formatted message", json_message])
        else:
          print('{}'.format(("Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'})))
          yield tuple(["Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'}])
      except Exception as e:
          print('{}'.format(("Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)})))
          yield tuple(["Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)}])

class WriteDataframeToBQ(beam.DoFn):
    
    def process(self, message):
      global project_id 
      global bigquery_dataset
      global bigquery_message_table
      global bigquery_error_table
      global bigquery_message_agg_table
      global subscription_id
      global client
      global bq_dataset
      global bq_msg_table
      global bq_err_table
      global bq_agg_table
      if message[0] == 'Formatted message':
        try:
          print('Now inserting:', message[1])
          job = client.load_table_from_json(message[1], BQ_MSG_TABLE, job_config = message_job_config)
          job.result()  # Waits for the job to complete.
          print('Total {} messages successfully written to BigQuery Messsage Table'.format(len(message[1])))
          
          # Creating aggregation dataframe 
          df = pd.DataFrame(message[1]) 
          count_df= df.groupby(['MessageType','Location','Building','ID','TID']).size().reset_index(name='no_of_rows')
          job = client.load_table_from_json(count_df.to_dict('records'), BQ_AGG_TABLE, job_config = agg_job_config)
          job.result()  # Waits for the job to complete.
          print('Total {} messages successfully written to BigQuery Agree Messsage Table'.format(str(count_df.shape[0])))
         
        except Exception as e:
         # print(job.errors)
          raise e
      else:
        
        try:          
          error_rows = [{"Message": json.dumps(m["Original_Message"]) if isinstance(m["Original_Message"], dict) else str(m["Original_Message"]), "Error": m["Error"]} for m in message[1]]
          print('Now inserting:', error_rows)
          job = client.load_table_from_json(error_rows, BQ_ERR_TABLE, job_config = error_job_config)
          job.result()  # Waits for the job to complete.
          print('Total {} messages successfully written to BigQuery Error Table'.format(len(error_rows)))
        except Exception as e:
          #print(job.errors)
          raise e       

def run():  


    pipeline_options = {
      'project': 'test-project',        
      'runner': 'DataflowRunner',
      'region': 'us-east1',
      'staging_location': 'gs://bucket12345/tmp/',
      'temp_location': 'gs://bucket12345/tmp/',
      'template_location': 'gs://bucket12345/template/dataflow_test_template',
      
       'streaming': True
      }
    pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
        table = 'test-project:bq_load.message_table'

    with beam.Pipeline(options=pipeline_options) as p:
       (     p
          | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
              subscription=INPUT_SUBSCRIPTION, timestamp_attribute=None
          )
          | "Process Message" >> beam.ParDo(ProcessMessage())
          | 'Fixed-size windows' >> beam.WindowInto(window.FixedWindows(WINDOW_SIZE))
          | beam.GroupByKey()
          | "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ())
          | "Write Raw Data to BQ" >> beam.io.WriteToBigQuery(
              table,custom_gcs_temp_location = 'gs://bucket12345/template/dataflow_test_template')
      )

run()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source