'Apache Flink batch mode FileSink to S3 can't finish in jetbrains

What we are trying to do: we are evaluating Flink to perform batch processing using DataStream API in BATCH mode. Minimal application to reproduce the issue:

    FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")))
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    val inputStream = env.fromSource(
      FileSource.forRecordStreamFormat(new TextLineFormat(), new Path("s3://testtest/2022/04/12/")).build(), WatermarkStrategy.noWatermarks()
        .withTimestampAssigner(new SerializableTimestampAssigner[String]() {
          override def extractTimestamp(element: String, recordTimestamp: Long): Long = -1
        }), "MySourceName"
    )
      .map(str => {
        val jsonNode = JsonUtil.getJSON(str)
        val log = JsonUtil.getJSONString(jsonNode, "log")
        if (StringUtils.isNotBlank(log)) {
          log
        } else {
          ""
        }
      })
      .filter(StringUtils.isNotBlank(_))
    val sink: FileSink[BaseLocation] = FileSink
      //      .forBulkFormat(new Path("/Users/temp/flinksave"), AvroWriters.forSpecificRecord(classOf[BaseLocation]))
      .forBulkFormat(new Path("s3://testtest/avro"), AvroWriters.forSpecificRecord(classOf[BaseLocation]))
      .withRollingPolicy(OnCheckpointRollingPolicy.build())
      .withOutputFileConfig(config)
      .build()
    inputStream.map(data => {
      val baseLocation = new BaseLocation()
      baseLocation.setRegion(data)
      baseLocation
    }).sinkTo(sink)
    inputStream.print("input:")
    env.execute()

Flink version: 1.14.2

  1. the program executes normally when the path is local.
  2. The program does not give a error when path change to s3://. However I do not see any files being written in S3 either.
  3. This problem does not exist in the stand-alone mode, but only in the local development environment jetbrains IDEA. Is it because I lack configuration? I have already configured flink-config.yaml like:
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://127.0.0.1:39000

log

18:42:25,524 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000002]
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000001]
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle.
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle.
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
18:42:25,525 INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 11 (on host '') is requesting a file source split
18:42:25,525 INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more splits available for subtask 11
18:42:25,525 INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 8 (on host '') is requesting a file source split
18:42:25,525 INFO  org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more splits available for subtask 8
18:42:25,525 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event.
18:42:25,526 INFO  org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event.


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source