'Exception while writing multipart empty csv file from Apache Beam into netApp Storage Grid

Problem Statement

We are consuming multiple csv files into pcollections -> Apply beam SQL to transform data -> write resulted pcollection. This is working absolutely fine if we have some data in all the source pCollections and beam SQL generates new collection with some data. When Transform pCollection is generating empty pCollection and when writing that in netApp Storage Grid it is throwing below,

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
    at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
    at ECSOperations.main(ECSOperations.java:53)
Caused by: java.io.IOException: Failed closing channel to s3://bucket-name/.temp-beam-847f362f-8884-454e-bfbe-baf9a4e32806/0b72948a5fcccb28-174f-426b-a225-ae3d3cbb126f
    at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1076)
    at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.createMissingEmptyShards(FileBasedSink.java:759)
    at org.apache.beam.sdk.io.FileBasedSink$WriteOperation.finalizeDestination(FileBasedSink.java:639)
    at org.apache.beam.sdk.io.WriteFiles$FinalizeTempFileBundles$FinalizeFn.process(WriteFiles.java:1040)
Caused by: java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: Your proposed upload is smaller than the minimum allowed object size. (Service: Amazon S3; Status Code: 400; Error Code: EntityTooSmall; Request ID: 1643869619144605; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null

Following is sample code

ECSOptions options = PipelineOptionsFactory.fromArgs(args).as(ECSOptions.class);
setupConfiguration(options);
Pipeline p = Pipeline.create(options);
PCollection<String> pSource= p.apply(TextIO.read().from("src/main/resources/empty.csv"));    
pSource.apply(TextIO.write().to("s3://bucket-name/empty.csv").withoutSharding());    
p.run();

Observation

  • It is working fine if we write simple file and not multipart file(simple put object to Storage Grid)
  • Seems its known issue with Storage Grid but we want to check whether we can handle this from beam pipeline or not.

What I have tried

  • Tried to see if I can check size of PCollection before writing and put some string into output file but since PCollection is empty it is not going in PTransform at all.
  • Tried with Count.globally as well but that event didn't help

Ask

  • Is there anyway we can handle this in Beam like we can check size of the PCollection before writing? and if size is zero i.e. empty pcollection so we can avoid writing file to avoid this issue.
  • Has anyone faced similar issue and able to sort out?


Solution 1:[1]

You can't check to see if the PCollection is empty during pipeline construction, as it has not been computed yet. If this filesystem can't support empty files, you could try writing to another filesystem and then copying iff the file is not empty (assuming the file in question is not too large).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 robertwb