'how to create apache Beam pipeline by using python that read 2 lack records(JSON) from pub sub and inserting into bigtable (Proper Format)
I wanted to read 2 lack Json Messages from the pub sub topic and write data to Big Table with proper column qualifiers name and different row keys with the dataflow code written in Python with below code I am getting issue if any one have idea plz help here
import datetime
import json
# Classes
c=0
class CreateRowFn(beam.DoFn):
def __init__(self, pipeline_options):
self.instance_id = pipeline_options.bigtable_instance
self.table_id = pipeline_options.bigtable_table
def process(self, key, element, window=beam.DoFn.WindowParam):
rows=[]
data1 = element
timestamp = datetime.datetime.utcnow()
row_keyed= "final" +str(timestamp)
direct_row1 = row.DirectRow(row_key=row_keyed)
data_dict = json.loads(data1)
for key1, value1 in data_dict.items():
direct_row1.set_cell('cf1','{}'.format(key1), '{}'.format(value1), timestamp)
rows.append(direct_row1)
yield direct_row1
class XyzOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--bigtable_project', default='nested'),
parser.add_argument('--bigtable_instance', default='instance'),
parser.add_argument('--bigtable_table', default='table')
pipeline_options = XyzOptions(
save_main_session=True, streaming=True,
runner='DataflowRunner',
project="gcp-project",
region="europe-west2",
bigtable_project="gcp-project",
bigtable_instance="bigtable", #instance ID needs to be mentioned, not name
bigtable_table="pubtobigtable",
autoscaling_algorithm="THROUGHPUT_BASED",
num_workers=3, #default = 3, max = 100
max_num_workers=10)
# Pipeline
def run (argv=None):
input_subscription = "projects/gcp-project/subscriptions/two_lac_data-sub"
with beam.Pipeline(options=pipeline_options) as p:
(
p
| 'Read from Pub/Sub' >>beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
| 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8'))
| 'Conversion string to row object' >>
beam.ParDo(CreateRowFn(pipeline_options))
| 'Writing row object to BigTable' >>WriteToBigTable(project_id=pipeline_options.bigtable_project,
instance_id=pipeline_options.bigtable_instance,
table_id=pipeline_options.bigtable_table))
result = p.run() #1
result.wait_until_finish() #2
if __name__ == '__main__':
run()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
