'Pubsub to BQ load using beam
I am trying to create beam code which will just load the data from pubsub to bq, when i am running the same code from colab its working perfectly but when i am creating template or dataflow as running i am getting issues not define etc.
Error = Invalid JSON payload received. Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list. [while running 'Write Raw Data to BQ/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)-ptransform-60']
line 179, in process UnboundLocalError: local variable 'job' referenced before assignment [while running 'Write Raw Data to Big Query-ptransform-63'] passed through: ==> dist_proc/dax/workflow/worker/fnapi_service_impl.cc:922
Code :
import argparse
import json
import os
import logging
import pandas as pd
import apache_beam as beam
from google.cloud import bigquery, pubsub_v1
from google.cloud.bigquery import table
import google.cloud.storage.client as gcs
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions, SetupOptions
import apache_beam.transforms.window as window
serviceAccount = '/content/test-project-0c3a740e58c8.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
project_id = "test-project"
bigquery_dataset = "bq_load"
bigquery_message_table = "message_table"
bigquery_error_table = "Error_Table"
bigquery_message_agg_table = "message_table_aggri"
subscription_id = "topic_message-sub"
INPUT_SUBSCRIPTION = pubsub_v1.SubscriberClient().subscription_path(project_id, subscription_id)
WINDOW_SIZE = 100
client = bigquery.Client(project = project_id)
bq_dataset = client.dataset(bigquery_dataset)
bq_msg_table = bq_dataset.table(bigquery_message_table)
bq_err_table = bq_dataset.table(bigquery_error_table)
bq_agg_table = bq_dataset.table(bigquery_message_agg_table)
message_table_schema = {
'name': 'MessageType',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Location',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Building',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'DateTime',
'type': 'DATETIME',
'mode': 'NULLABLE'
}, {
'name': 'ID',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'TID',
'type': 'STRING',
'mode': 'NULLABLE'
}
error_table_schema = {
'name': 'Message',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Error',
'type': 'STRING',
'mode': 'NULLABLE'
}
table_schema_agri = {
'name': 'MessageType',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Location',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'Building',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'ID',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'TID',
'type': 'STRING',
'mode': 'NULLABLE'
}, {
'name': 'no_of_rows',
'type': 'NUMERIC',
'mode': 'NULLABLE'
}
def format_schema(schema):
formatted_schema = []
column_list = []
for row in schema:
formatted_schema.append(bigquery.SchemaField(row['name'], row['type'], row['mode']))
column_list.append(row['name'])
return formatted_schema, set(column_list)
message_job_config = bigquery.LoadJobConfig()
message_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
message_job_config.schema, message_column_list = format_schema(message_table_schema)
print(message_column_list)
error_job_config = bigquery.LoadJobConfig()
error_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
error_job_config.schema, error_column_list = format_schema(error_table_schema)
print(error_column_list)
agg_job_config = bigquery.LoadJobConfig()
agg_job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
agg_job_config.schema, agg_column_list = format_schema(table_schema_agri)
print(agg_column_list)
class ProcessMessage(beam.DoFn):
def process(self, message):
json_object = []
try:
message_str = message.decode("utf-8")
json_message = eval(message_str)
if isinstance(json_message, dict) and set(message_column_list).issubset(json_message.keys()):
print('{}'.format(("Formatted message", json_message)))
yield tuple(["Formatted message", json_message])
else:
print('{}'.format(("Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'})))
yield tuple(["Error_Message", {"Original_Message": json_message, "Error": 'Unable to map input to table schema'}])
except Exception as e:
print('{}'.format(("Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)})))
yield tuple(["Error_Message", {"Original_Message": message_str if message_str else message, "Error": str(e)}])
class WriteDataframeToBQ(beam.DoFn):
def process(self, message):
global project_id
global bigquery_dataset
global bigquery_message_table
global bigquery_error_table
global bigquery_message_agg_table
global subscription_id
global client
global bq_dataset
global bq_msg_table
global bq_err_table
global bq_agg_table
if message[0] == 'Formatted message':
try:
print('Now inserting:', message[1])
job = client.load_table_from_json(message[1], BQ_MSG_TABLE, job_config = message_job_config)
job.result() # Waits for the job to complete.
print('Total {} messages successfully written to BigQuery Messsage Table'.format(len(message[1])))
# Creating aggregation dataframe
df = pd.DataFrame(message[1])
count_df= df.groupby(['MessageType','Location','Building','ID','TID']).size().reset_index(name='no_of_rows')
job = client.load_table_from_json(count_df.to_dict('records'), BQ_AGG_TABLE, job_config = agg_job_config)
job.result() # Waits for the job to complete.
print('Total {} messages successfully written to BigQuery Agree Messsage Table'.format(str(count_df.shape[0])))
except Exception as e:
# print(job.errors)
raise e
else:
try:
error_rows = [{"Message": json.dumps(m["Original_Message"]) if isinstance(m["Original_Message"], dict) else str(m["Original_Message"]), "Error": m["Error"]} for m in message[1]]
print('Now inserting:', error_rows)
job = client.load_table_from_json(error_rows, BQ_ERR_TABLE, job_config = error_job_config)
job.result() # Waits for the job to complete.
print('Total {} messages successfully written to BigQuery Error Table'.format(len(error_rows)))
except Exception as e:
#print(job.errors)
raise e
def run():
pipeline_options = {
'project': 'test-project',
'runner': 'DataflowRunner',
'region': 'us-east1',
'staging_location': 'gs://bucket12345/tmp/',
'temp_location': 'gs://bucket12345/tmp/',
'template_location': 'gs://bucket12345/template/dataflow_test_template',
'streaming': True
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
table = 'test-project:bq_load.message_table'
with beam.Pipeline(options=pipeline_options) as p:
( p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(
subscription=INPUT_SUBSCRIPTION, timestamp_attribute=None
)
| "Process Message" >> beam.ParDo(ProcessMessage())
| 'Fixed-size windows' >> beam.WindowInto(window.FixedWindows(WINDOW_SIZE))
| beam.GroupByKey()
| "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ())
| "Write Raw Data to BQ" >> beam.io.WriteToBigQuery(
table,custom_gcs_temp_location = 'gs://bucket12345/template/dataflow_test_template')
)
run()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
