'Add timestamp in outputfile name
we have a long running pipeline and we would like to add the timestamp to the filenames as close to the pipeline ends' time as possible.
The solution we have come up with is using FilenamePolicy
, which seems working ok, but it always prints a warnning message complaining about deleting the temp file, but the file was deleted successfully. we are using version 2.31.0
{"@timestamp":"2022-04-26T16:39:39.182-04:00","@version":"1","message":"Failed to match temporary files under: [C:\\Users\\userId\\Desktop\\tbd\\output\\.temp-beam-1deb0fa8-96fe-4989-b5aa-ea508906cb96\\].","logger_name":"org.apache.beam.sdk.io.FileBasedSink","thread_name":"direct-runner-worker","severity":"WARN","level_value":30000}
Here is the code snippet :
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
String now = DATE_TIME_FORMATTER.format(
LocalDateTime.now());
System.out.println("current time" + now);
Write writer = TextIO.write().withNumShards(1)
.withTempDirectory(
FileSystems.matchNewResource("C:\\Users\\userId\\Desktop\\tbd\\output", true))
.to(new FilenamePolicy() {
@Override
public ResourceId windowedFilename(int shardNumber, int numShards, BoundedWindow window,
PaneInfo paneInfo, OutputFileHints outputFileHints) {
throw new RuntimeException("not implemented");
}
@Override
public @Nullable ResourceId unwindowedFilename(int shardNumber, int numShards,
OutputFileHints outputFileHints) {
String time = DATE_TIME_FORMATTER.format(
LocalDateTime.now());
String filename =
String.format(
"%s-%s-of-%s%s",
"C:\\Users\\userId\\Desktop\\tbd\\output\\file-" + time,
shardNumber,
numShards,
outputFileHints.getSuggestedFilenameSuffix());
return FileSystems.matchNewResource(
filename, false);
}
});
pipeline.apply(Create.of("test", "test2", "test3")).apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void process(ProcessContext c) {
try {
Thread.sleep(120000);
c.output(c.element());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
})).apply(writer);
pipeline.run();
Solution 1:[1]
It's because the Windows OS cannot resolve and match "*" in the path and the exception is ignored with such a warning message. See the comment.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | ningk |