'Move file after Streaming Pipeline dataflow

I created a streaming apache beam pipeline that read files from GCS folders and insert them in bigtable, it works perfectly but I would like to moves the proccessed files in a new GCS folder(OK folder) to indicate that these files have been proccessed by the pipeline. Is there a technical solution to do that with apache beam ?

    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
            new CloudBigtableTableConfiguration.Builder()
                    .withProjectId(options.getProjectId())
                    .withInstanceId(options.getBigtableInstanceId())
                    .withAppProfileId("default")
                    .withTableId(options.getBigtableTableId())
                    .build();

    // pipeline
    p.apply("read from csv", TextIO.read().from(options.getInputFilePattern()).watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))
            .apply("Transfrom to Bigtable", ParDo.of(new TextToBigtableFn(options.getBigtableInstanceId(), options.getBigtableTableId())))
            .apply("write into bigtable", CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run();


Solution 1:[1]

Unfortunately applying a CloudBigtableIO does not return a meaningful result (it's a PDone) that can be followed, though this is something we'd like to change.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 robertwb