'How to execute independent function after write to bigquery in dataflow batch pipeline using python?

I am trying to move file to different bucket after its successfully loaded to bigquery, but the independent function is executing before pipeline starts how to time the execution of independent function 'process()' to run after successfully loading to bigquery.

def process(file_name):
    """Moves a blob from one bucket to another."""
    storage_client = storage.Client()
    source_bucket=storage_client.bucket('source_bucket')
    destination_bucket=storage_client.bucket('destination bucket')
    source_blob=source_bucket.blob(file_name)
    destination_blob_name=(file_name)
    blob_copy = source_bucket.copy_blob(source_blob, destination_bucket,destination_blob_name)
    source_bucket.delete_blob(file_name)
    print('File {} is transfered from {} to{}'.format(file_name,source_bucket,destination_bucket))

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
    with beam.Pipeline(options=pipeline_options) as p:
        from google.cloud import storage
        client = storage.Client()
        bucket = client.bucket('sourcebucket')
        blob = bucket.get_blob('sourcefile.avro')
        downloaded_blob = "temporary.avro"
        blob.download_to_filename(downloaded_blob)
        reader = DataFileReader(open(downloaded_blob, "rb"), DatumReader())
        file_name=blob.name
        records = [r for r in reader]
        # Populate pandas.DataFrame with records
        df = pd.DataFrame.from_records(records)
        
        (
        convert.to_pcollection(df,pipeline=p,label="pcollection ")
        |'To dictionaries ' >> beam.Map(lambda x: dict(x._asdict()))
        | 'WriteToBigQuery ' >> beam.io.WriteToBigQuery('projectID:datasetID.table',
          schema='SCHEMA_AUTODETECT',
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
          write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,)
        
        |'move'>>beam.FlatMap(process(file_name)
        )
        
        
if __name__ == "__main__":
    run()


Solution 1:[1]

I'm not from a python background, but I have implemented something similar in Java. The way you would want to approach this is to use FileReader.match() that would return you Metadata of the new files. Then you would bifurcate this collection into two, one (PCollection1) would get the metadata and do FileIO.readMatches or TextIO.read() and then further process this data and write it to a sink.

In another branch you would use Wait.on(PCollection1).apply(ParDo.of(new yourDeleteFn)) and then inside a ParDo create a storage client and move those files that have already been processed into another bucket.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1