'How to override default metadata.lastModifiedMillis() of Apache beam's FileIO with actual file's last modified time?
Use Case: I have to filter files based on the lastModifiedTime using Apache beam (Java)
My Code:
PCollection<String> readfile = pipeline
.apply(FileIO.match().filepattern(path)
.continuously(Duration.standardSeconds(30), Watch.Growth.never()))
.setCoder(MetadataCoderV2.of())
.apply(Filter.by(metadata -> {
System.out.println("metadata: " + metadata.toString());
return (metadata.lastModifiedMillis()) > current;}))
.apply(FileIO.readMatches())
.apply(TextIO.readFiles());
Problem: Even though I am using the MetadataCoderV2 coder, getting default value (0) for metadata.lastModifiedMillis() as below
Metadata{resourceId="fileName", sizeBytes=108, isReadSeekEfficient=true, lastModifiedMillis=0}
can anyone please suggest how to resolve this issue.
Solution 1:[1]
You should register the MetadataCoderV2 earlier in the pipeline.
Also the ReadableFileCoder uses always MetadataCoder (v1) so you need to override it with a coder that handler Metadata v2.
pipeline.getCoderRegistry().registerCoderForClass(MatchResult.Metadata.class,MetadataCoderV2.of());
pipeline.getCoderRegistry().registerCoderForClass(FileIO.ReadableFile.class, ReadableFileCoderV2.of());
pipeline.apply(FileIO.match().filepattern(path)
.continuously(Duration.standardSeconds(30), Watch.Growth.never()))
//rest of the pipeline
I opened a PR for Beam to create ReadableFileCoderv2, but until that we will have to do it with some reflection since the FileIO.ReadableFile has no public method to initiate new instance of it.
public class ReadableFileCoderv2 extends ReadableFileCoder {
private static final ReadableFileCoderv2 INSTANCE = new ReadableFileCoderv2();
public static ReadableFileCoderv2 of() {
return INSTANCE;
}
@Override
public void encode(FileIO.ReadableFile value, OutputStream os) throws IOException {
MetadataCoderV2.of().encode(value.getMetadata(), os);
VarIntCoder.of().encode(value.getCompression().ordinal(), os);
}
@Override
public FileIO.ReadableFile decode(InputStream is) throws IOException {
MatchResult.Metadata metadata = MetadataCoderV2.of().decode(is);
Compression compression = Compression.values()[VarIntCoder.of().decode(is)];
try {
FileIO.ReadableFile readableFile = (FileIO.ReadableFile) unsafe.allocateInstance(FileIO.ReadableFile.class);
Field fieldMetadata = FileIO.ReadableFile.class.getDeclaredField("metadata");
fieldMetadata.setAccessible(true);
fieldMetadata.set(readableFile,metadata);
Field fieldCompression = FileIO.ReadableFile.class.getDeclaredField("compression");
fieldCompression.setAccessible(true);
fieldCompression.set(readableFile,compression);
return readableFile;
} catch (InstantiationException | NoSuchFieldException |IllegalAccessException e) {
return super.decode(is);
}
}
Solution 2:[2]
Now that the PR in the previous answer has been merged, you can do:
MetadataCoderV2 metadataCoder = MetadataCoderV2.of();
p.getCoderRegistry().registerCoderForClass(Metadata.class, metadataCoder);
p.getCoderRegistry().registerCoderForClass(FileIO.ReadableFile.class, ReadableFileCoder.of(metadataCoder));
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Matt Martin |
