'How to access file name within a DoFn in an unbounded pipeline
I'm looking for a way to access the name of the file being processed during the data transformation within a DoFn.
My pipeline is as shown below:
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match()
.filepattern(options.getInput())
.continuously(Duration.standardSeconds(5),
Watch.Growth.<String>never()))
.apply(FileIO.readMatches()
.withCompression(Compression.GZIP))
.apply(XmlIO.<MyString>readFiles()
.withRootElement("root")
.withRecordElement("record")
.withRecordClass(MyString.class))//<-- This only returns the contents of the file
.apply(ParDo.of(new ProcessRecord()))//<-- I need to access file name here
.apply(ParDo.of(new FormatRecord()))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))))
.apply(new CustomWrite(options));
Each file that is processed is an XML document. While processing the content, I need access to the name of the file being processed too to include in the transformed record.
Is there a way to achieve this?
This post has a similar question, but since i'm trying to use XmlIO I havent found a way to access the file metadata.
Below is the approach I found online, but not sure if there is a way to use it in the pipeline described above.
p.apply(FileIO.match()
.filepattern(options.getInput())
.continuously(Duration.standardSeconds(5),
Watch.Growth.<String>never()))//File Metadata
.apply(FileIO.readMatches()
.withCompression(Compression.GZIP))//Readable Files
.apply(MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.strings(),new TypeDescriptor<ReadableFile>() {} ))
.via((ReadableFile file) -> {
return KV.of(file.getMetadata().resourceId().getFilename(),file);
})
);
Any suggestions are highly appreciated. Thank you for your time reviewing this.
EDIT:
I took Alexey's advice and implemented a custom XmlIO. It would be nice if we could just extend the class we need and override the appropriate method. However, in this specific case, there was a reference to one method which was protected within the sdk because of which I couldn't easily override what i needed and instead ended up copying a whole bunch of files. While this works for now, I hope in future there is a more straighforward way to access the file metadata in these IO implementations.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
