'No attribute 'TableReference' in Apache Beam when trying to write to BigQuery [Jupyter Notebook]
Attempting to write a pipeline [in Apache Beam (Python)] to BigQuery using Jupyter Notebook (local), but seems like its failing because the following import is unsuccessful.
[File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py]
try:
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
except ImportError:
DatasetReference = None
TableReference = None
Actual Error thrown:
--> 240 if isinstance(table, TableReference):
241 return TableReference(
242 projectId=table.projectId,
243 datasetId=table.datasetId,
244 tableId=table.tableId)
245 elif callable(table):
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
I wasn't able to find the 'TableReference' module under apache_beam.io.gcp.internal.clients.bigquery
Screenshot of the windows explorer folder location
These are the libraries I have imported..
!pip install google-cloud
!pip install google-cloud-pubsub
!pip install google-cloud-bigquery
!pip install apache-beam[gcp]
I read in an article that I need to import apache-beam[gcp] instead of apache-beam (which I was doing earlier).
Note! The code runs perfectly fine in Google Colabs.
Here is the complete code:
from google.cloud.bigquery import table
import argparse
import json
import os
import time
import logging
import pandas as pd
import apache_beam as beam
from google.cloud import bigquery, pubsub_v1
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions, SetupOptions
import apache_beam.transforms.window as window
PROJECT_ID = "commanding-bee-322221"
BIGQUERY_DATASET = "Sample_Dataset"
BIGQUERY_MESSAGE_TABLE = "Message_Table"
SUBSCRIPTION_ID = "My-First-Test-Topic-Sub"
BQ_CLIENT = bigquery.Client(project=PROJECT_ID)
BQ_DATASET = BQ_CLIENT.dataset(BIGQUERY_DATASET)
BQ_MSG_TABLE = BQ_DATASET.table(BIGQUERY_MESSAGE_TABLE)
logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
pipeline_options = {
'project': PROJECT_ID,
# 'runner': 'DataflowRunner',
'region': 'us-east1',
'staging_location': 'gs://my-test-bucket/tmp/',
'temp_location': 'gs://my-test-bucket/tmp/',
# 'template_location': 'gs://my-test-bucket/tmp/My_Template',
'save_main_session':False,
'streaming': True
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p=beam.Pipeline(options=pipeline_options)
table_schema='MessageType:STRING,Location:STRING,Building:STRING,DateTime:DATETIME,ID:STRING,TID:STRING'
table= f'{PROJECT_ID}:{BIGQUERY_DATASET}.{BIGQUERY_MESSAGE_TABLE}'
class create_formatted_message(beam.DoFn):
def process(self,record):
message_str = record.decode("utf-8")
json_message = eval(message_str)
print(json_message)
# Construct the record
rows_to_insert = [{u"MessageType": u"{}".format(json_message["MessageType"]),
u"Location": u"{}".format(json_message["Location"]),
u"Building": u"{}".format(json_message["Building"]),
u"DateTime": u"{}".format(json_message["DateTime"]),
u"ID": u"{}".format(json_message["ID"]),
u"TID": u"{}".format(json_message["TID"])}]
return rows_to_insert
print(table)
processPubSubMessage = (
p
| "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=f'projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}', timestamp_attribute=None)
| "Create Formatted Message" >> beam.ParDo(create_formatted_message())
| "Write TO BQ" >> beam.io.WriteToBigQuery(
table,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://my-test-bucket/tmp/'
)
)
result = p.run()
# result.wait_until_finish()
time.sleep(20)
result.cancel()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|