'Flink SQL client does not create new files in AWS S3 bucket when using batch execution mode

We are using the Flink SQL client to sink data from various sources to S3 buckets. We are encountering issues when trying to use batch mode to write to S3 bucket. We are using batch mode with checkpointing disabled. When using SELECT INSERT, the files are not created in S3 buckets. The SQL client logs do not provide any additional information.

The same set of SQL queries work fine, when we switch to streaming execution mode. However, we cannot use “INSERT OVERWRITE” in streaming mode.

Any help or feedback would be greatly appreciated. Did anyone encounter similar issues?

Flink release: 1.13

SQL queries:

CREATE TABLE  test_emp (
   name STRING,
   empId STRING,
   company STRING
)
 WITH (
   'connector' = 'filesystem',
   'path' = 'file:///home/constgops/Ajay/test',
   'format' = 'csv'
 );


CREATE TABLE s3_emp (
   name STRING,
   empId STRING,
   company STRING
)
 WITH (
   'connector' = 'filesystem',
   'path' = 's3a://poc-datalake-us-east-1/signaling/test',
   'format' = 'parquet'
 );

SET execution.runtime-mode = 'batch';

INSERT INTO s3_emp (name, empId, company) SELECT name, empId, company from test_emp;

Logs:

2022-05-16 20:00:44,805 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@63c3188
2022-05-16 20:00:44,805 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'filesystem': (checkpoints "file:/home/constgops/Abhijit/Flink/flink-1.13.1/flink-checkpoints")
2022-05-16 20:00:44,805 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2) switched from DEPLOYING to INITIALIZING.
2022-05-16 20:00:44,838 INFO  org.apache.hadoop.conf.Configuration.deprecation             [] - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
2022-05-16 20:00:44,840 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) exceeded the 80 characters length limit and was truncated.
2022-05-16 20:00:44,841 WARN  org.apache.flink.runtime.state.StateBackendLoader            [] - filesystem state backend has been deprecated. Please use 'hashmap' state backend instead.
2022-05-16 20:00:44,841 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@19efb7c3
2022-05-16 20:00:45,103 WARN  org.apache.flink.runtime.state.StateBackendLoader            [] - filesystem state backend has been deprecated. Please use 'hashmap' state backend instead.
2022-05-16 20:00:45,103 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5ef29704
2022-05-16 20:00:45,105 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2) switched from INITIALIZING to RUNNING.
2022-05-16 20:00:45,201 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2) switched from RUNNING to FINISHED.
2022-05-16 20:00:45,201 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2).
2022-05-16 20:00:45,202 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 7b42630a41eac7ed55a72cdaf1bd29b2.
2022-05-16 20:00:45,991 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:10, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=816.640mb (856309094 bytes), taskOffHeapMemory=102.400mb (107374182 bytes), managedMemory=655.360mb (687194777 bytes), networkMemory=51.200mb (53687091 bytes)}, allocationId: 2daf310333aaed54b7f418886b00abb4, jobId: 6e4c753a14baa10dd3f5bc6553c4eb38).
2022-05-16 20:00:45,992 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 6e4c753a14baa10dd3f5bc6553c4eb38 from job leader monitoring.
2022-05-16 20:00:45,992 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job 6e4c753a14baa10dd3f5bc6553c4eb38.


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source