'Error in dataflow job which uses the custom template
I have created the custom template file using following code and it is creating the template successfully in to google cloud storage. But when we try to use that in dataflow job to read data from pubsub and load into Bigquery it throws following error. the code is basically creating the template which will read the data from pubsub and loading the aggregated data, error data and as is data
****Error :
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 537, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "<ipython-input-9-fee841532e96>", line 192, in process
File "<ipython-input-9-fee841532e96>", line 187, in process
NameError: name 'client' is not defined
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
ython-input-9-fee841532e96>", line 192, in process
File "<ipython-input-9-fee841532e96>", line 187, in process
NameError: name 'client' is not defined [while running 'Write Raw Data to Big Query-ptransform-63']
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service_impl.cc:921****
Code :
import argparse
import json
import os
import logging
import pandas as pd
import apache_beam as beam
from google.cloud import bigquery, pubsub_v1
from google.cloud.bigquery import table
import google.cloud.storage.client as gcs
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions, SetupOptions
import apache_beam.transforms.window as window
serviceAccount = '/content/test-project-0c3a740e58c8.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
project_id = "test-project"
bigquery_dataset = "bq_load"
bigquery_message_table = "message_table"
bigquery_error_table = "Error_Table"
bigquery_message_agg_table = "message_table_aggri"
subscription_id = "topic_message-sub"
INPUT_SUBSCRIPTION = pubsub_v1.SubscriberClient().subscription_path(project_id, subscription_id)
WINDOW_SIZE = 100
client = bigquery.Client(project = project_id)
bq_dataset = client.dataset(bigquery_dataset)
bq_msg_table = bq_dataset.table(bigquery_message_table)
bq_err_table = bq_dataset.table(bigquery_error_table)
bq_agg_table = bq_dataset.table(bigquery_message_agg_table)
message_table_schema = {
'name': 'MessageType',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Location',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Building',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'DateTime',
'type': 'DATETIME',
'mode': 'NULLABLE'
}, {
'name': 'ID',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'TID',
'type': 'STRING',
'mode': 'NULLABLE'
}
error_table_schema = {
'name': 'Message',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Error',
'type': 'STRING',
'mode': 'NULLABLE'
}
table_schema_agri = {
'name': 'MessageType',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Location',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Building',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'ID',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'TID',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'no_of_rows',
'type': 'NUMERIC',
'mode': 'NULLABLE'
}
def format_schema(schema):
formatted_schema = []
column_list = []
for row in schema:
formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))
column_list.append(row['name'])
return formatted_schema, set(column_list)
message_job_config = bigquery.LoadJobConfig()
message_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
message_job_config.schema, message_column_list = format_schema(message_table_schema)
print(message_column_list)
error_job_config = bigquery.LoadJobConfig()
error_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
error_job_config.schema, error_column_list = format_schema(error_table_schema)
print(error_column_list)
agg_job_config = bigquery.LoadJobConfig()
agg_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
agg_job_config.schema, agg_column_list = format_schema(table_schema_agri)
print(agg_column_list)
class ProcessMessage(beam.DoFn):
def process(self, message):
json_object = []
try:
message_str = message.decode("utf-8")
json_message = eval(message_str)
if isinstance(json_message, dict) and set(message_column_list).issubset(json_message.keys()):
print('{}'.format(("Formatted message", json_message)))
yield tuple(["Formatted message", json_message])
else:
print('{}'.format(("Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'})))
yield tuple(["Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'}])
except Exception as e:
print('{}'.format(("Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)})))
yield tuple(["Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)}])
class WriteDataframeToBQ(beam.DoFn):
def process(self, message):
global project_id
global bigquery_dataset
global bigquery_message_table
global bigquery_error_table
global bigquery_message_agg_table
global subscription_id
global client
global bq_dataset
global bq_msg_table
global bq_err_table
global bq_agg_table
if message[0] == 'Formatted message':
try:
print('Now inserting:', message[1])
job = client.load_table_from_json(message[1], BQ_MSG_TABLE, job_config = message_job_config)
job.result() # Waits for the job to complete.
print('Total {} messages successfully written to BigQuery Messsage Table'.format(len(message[1])))
# Creating aggregation dataframe
df = pd.DataFrame(message[1])
count_df= df.groupby(['MessageType','Location','Building','ID','TID']).size().reset_index(name='no_of_rows')
job = client.load_table_from_json(count_df.to_dict('records'), BQ_AGG_TABLE, job_config = agg_job_config)
job.result() # Waits for the job to complete.
print('Total {} messages successfully written to BigQuery Agree Messsage Table'.format(str(count_df.shape[0])))
except Exception as e:
# print(job.errors)
raise e
else:
try:
error_rows = [{"Message": json.dumps(m["Original_Message"]) if isinstance(m["Original_Message"], dict) else str(m["Original_Message"]), "Error": m["Error"]} for m in message[1]]
print('Now inserting:', error_rows)
job = client.load_table_from_json(error_rows, BQ_ERR_TABLE, job_config = error_job_config)
job.result() # Waits for the job to complete.
print('Total {} messages successfully written to BigQuery Error Table'.format(len(error_rows)))
except Exception as e:
#print(job.errors)
raise e
def run():
pipeline_options = {
'project': 'test-project',
'runner': 'DataflowRunner',
'region': 'us-east1',
'staging_location': 'gs://bucket12345/tmp/',
'temp_location': 'gs://bucket12345/tmp/',
'template_location': 'gs://bucket12345/template/dataflow_test_template',
'streaming': True
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
table = 'test-project:bq_load.message_table'
with beam.Pipeline(options=pipeline_options) as p:
( p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=INPUT_SUBSCRIPTION, timestamp_attribute=None
)
| "Process Message" >> beam.ParDo(ProcessMessage())
| 'Fixed-size windows' >> beam.WindowInto(window.FixedWindows(WINDOW_SIZE))
| beam.GroupByKey()
| "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ())
| "Write Raw Data to BQ" >> beam.io.WriteToBigQuery(
table,custom_gcs_temp_location = 'gs://bucket12345/template/dataflow_test_template')
)
run()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
