'Move file after Streaming Pipeline dataflow
I created a streaming apache beam pipeline that read files from GCS folders and insert them in bigtable, it works perfectly but I would like to moves the proccessed files in a new GCS folder(OK folder) to indicate that these files have been proccessed by the pipeline. Is there a technical solution to do that with apache beam ?
Pipeline p = Pipeline.create(options);
CloudBigtableTableConfiguration bigtableTableConfig =
new CloudBigtableTableConfiguration.Builder()
.withProjectId(options.getProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withAppProfileId("default")
.withTableId(options.getBigtableTableId())
.build();
// pipeline
p.apply("read from csv", TextIO.read().from(options.getInputFilePattern()).watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))
.apply("Transfrom to Bigtable", ParDo.of(new TextToBigtableFn(options.getBigtableInstanceId(), options.getBigtableTableId())))
.apply("write into bigtable", CloudBigtableIO.writeToTable(bigtableTableConfig));
p.run();
Solution 1:[1]
Unfortunately applying a CloudBigtableIO does not return a meaningful result (it's a PDone) that can be followed, though this is something we'd like to change.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | robertwb |
