'Apache Flink batch mode FileSink to S3 can't finish in jetbrains
What we are trying to do: we are evaluating Flink to perform batch processing using DataStream API in BATCH mode. Minimal application to reproduce the issue:
FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")))
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
val inputStream = env.fromSource(
FileSource.forRecordStreamFormat(new TextLineFormat(), new Path("s3://testtest/2022/04/12/")).build(), WatermarkStrategy.noWatermarks()
.withTimestampAssigner(new SerializableTimestampAssigner[String]() {
override def extractTimestamp(element: String, recordTimestamp: Long): Long = -1
}), "MySourceName"
)
.map(str => {
val jsonNode = JsonUtil.getJSON(str)
val log = JsonUtil.getJSONString(jsonNode, "log")
if (StringUtils.isNotBlank(log)) {
log
} else {
""
}
})
.filter(StringUtils.isNotBlank(_))
val sink: FileSink[BaseLocation] = FileSink
// .forBulkFormat(new Path("/Users/temp/flinksave"), AvroWriters.forSpecificRecord(classOf[BaseLocation]))
.forBulkFormat(new Path("s3://testtest/avro"), AvroWriters.forSpecificRecord(classOf[BaseLocation]))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build()
inputStream.map(data => {
val baseLocation = new BaseLocation()
baseLocation.setRegion(data)
baseLocation
}).sinkTo(sink)
inputStream.print("input:")
env.execute()
Flink version: 1.14.2
- the program executes normally when the path is local.
- The program does not give a error when path change to
s3://. However I do not see any files being written in S3 either. - This problem does not exist in the stand-alone mode, but only in the local development environment jetbrains IDEA. Is it because I lack configuration? I have already configured flink-config.yaml like:
s3.access-key: test
s3.secret-key: test
s3.endpoint: http://127.0.0.1:39000
log
18:42:25,524 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000002]
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Finished reading split(s) [0000000001]
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle.
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Closing splitFetcher 0 because it is idle.
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Shutting down split fetcher 0
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Split fetcher 0 exited.
18:42:25,525 INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 11 (on host '') is requesting a file source split
18:42:25,525 INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more splits available for subtask 11
18:42:25,525 INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - Subtask 8 (on host '') is requesting a file source split
18:42:25,525 INFO org.apache.flink.connector.file.src.impl.StaticFileSplitEnumerator [] - No more splits available for subtask 8
18:42:25,525 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event.
18:42:25,526 INFO org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Reader received NoMoreSplits event.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
