'No attribute 'TableReference' in Apache Beam when trying to write to BigQuery [Jupyter Notebook]

Attempting to write a pipeline [in Apache Beam (Python)] to BigQuery using Jupyter Notebook (local), but seems like its failing because the following import is unsuccessful.

[File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py]

try:
  from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
  from apache_beam.io.gcp.internal.clients.bigquery import TableReference
except ImportError:
  DatasetReference = None
  TableReference = None

Actual Error thrown:

--> 240   if isinstance(table, TableReference):
    241     return TableReference(
    242         projectId=table.projectId,
    243         datasetId=table.datasetId,
    244         tableId=table.tableId)
    245   elif callable(table):

TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union

I wasn't able to find the 'TableReference' module under apache_beam.io.gcp.internal.clients.bigquery

Screenshot of the windows explorer folder location

These are the libraries I have imported..

!pip install google-cloud
!pip install google-cloud-pubsub
!pip install google-cloud-bigquery
!pip install apache-beam[gcp]

I read in an article that I need to import apache-beam[gcp] instead of apache-beam (which I was doing earlier).

Note! The code runs perfectly fine in Google Colabs.

Here is the complete code:

from google.cloud.bigquery import table
import argparse
import json
import os
import time
import logging
import pandas as pd
import apache_beam as beam
from google.cloud import bigquery, pubsub_v1
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions, SetupOptions
import apache_beam.transforms.window as window

PROJECT_ID = "commanding-bee-322221"
BIGQUERY_DATASET = "Sample_Dataset"
BIGQUERY_MESSAGE_TABLE = "Message_Table"
SUBSCRIPTION_ID = "My-First-Test-Topic-Sub"

BQ_CLIENT = bigquery.Client(project=PROJECT_ID)
BQ_DATASET  = BQ_CLIENT.dataset(BIGQUERY_DATASET)
BQ_MSG_TABLE = BQ_DATASET.table(BIGQUERY_MESSAGE_TABLE)

logging.basicConfig(level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)

pipeline_options = {
    'project': PROJECT_ID, 
    #       'runner': 'DataflowRunner',
    'region': 'us-east1',
    'staging_location': 'gs://my-test-bucket/tmp/',
    'temp_location': 'gs://my-test-bucket/tmp/',
    # 'template_location': 'gs://my-test-bucket/tmp/My_Template',
    'save_main_session':False,
    'streaming': True
    }

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p=beam.Pipeline(options=pipeline_options)

table_schema='MessageType:STRING,Location:STRING,Building:STRING,DateTime:DATETIME,ID:STRING,TID:STRING'
table= f'{PROJECT_ID}:{BIGQUERY_DATASET}.{BIGQUERY_MESSAGE_TABLE}'

class create_formatted_message(beam.DoFn):
    def process(self,record):
        message_str = record.decode("utf-8")
        json_message = eval(message_str)
        print(json_message)
        # Construct the record
        rows_to_insert = [{u"MessageType": u"{}".format(json_message["MessageType"]),
                           u"Location": u"{}".format(json_message["Location"]),
                           u"Building": u"{}".format(json_message["Building"]),
                           u"DateTime": u"{}".format(json_message["DateTime"]),
                           u"ID": u"{}".format(json_message["ID"]),
                           u"TID": u"{}".format(json_message["TID"])}]
        return rows_to_insert

print(table)
processPubSubMessage = (
    p
    | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=f'projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}', timestamp_attribute=None)
    | "Create Formatted Message" >> beam.ParDo(create_formatted_message())
    | "Write TO BQ" >> beam.io.WriteToBigQuery(
        table,
        schema=table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        custom_gcs_temp_location='gs://my-test-bucket/tmp/'
       )
    )

result = p.run()
# result.wait_until_finish()

time.sleep(20)
result.cancel()



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source