'Flink SQL client does not create new files in AWS S3 bucket when using batch execution mode
We are using the Flink SQL client to sink data from various sources to S3 buckets. We are encountering issues when trying to use batch mode to write to S3 bucket. We are using batch mode with checkpointing disabled. When using SELECT INSERT, the files are not created in S3 buckets. The SQL client logs do not provide any additional information.
The same set of SQL queries work fine, when we switch to streaming execution mode. However, we cannot use “INSERT OVERWRITE” in streaming mode.
Any help or feedback would be greatly appreciated. Did anyone encounter similar issues?
Flink release: 1.13
SQL queries:
CREATE TABLE test_emp (
name STRING,
empId STRING,
company STRING
)
WITH (
'connector' = 'filesystem',
'path' = 'file:///home/constgops/Ajay/test',
'format' = 'csv'
);
CREATE TABLE s3_emp (
name STRING,
empId STRING,
company STRING
)
WITH (
'connector' = 'filesystem',
'path' = 's3a://poc-datalake-us-east-1/signaling/test',
'format' = 'parquet'
);
SET execution.runtime-mode = 'batch';
INSERT INTO s3_emp (name, empId, company) SELECT name, empId, company from test_emp;
Logs:
2022-05-16 20:00:44,805 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@63c3188
2022-05-16 20:00:44,805 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'filesystem': (checkpoints "file:/home/constgops/Abhijit/Flink/flink-1.13.1/flink-checkpoints")
2022-05-16 20:00:44,805 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2) switched from DEPLOYING to INITIALIZING.
2022-05-16 20:00:44,838 INFO org.apache.hadoop.conf.Configuration.deprecation [] - io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
2022-05-16 20:00:44,840 WARN org.apache.flink.metrics.MetricGroup [] - The operator name Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) exceeded the 80 characters length limit and was truncated.
2022-05-16 20:00:44,841 WARN org.apache.flink.runtime.state.StateBackendLoader [] - filesystem state backend has been deprecated. Please use 'hashmap' state backend instead.
2022-05-16 20:00:44,841 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@19efb7c3
2022-05-16 20:00:45,103 WARN org.apache.flink.runtime.state.StateBackendLoader [] - filesystem state backend has been deprecated. Please use 'hashmap' state backend instead.
2022-05-16 20:00:45,103 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend is set to heap memory org.apache.flink.runtime.state.hashmap.HashMapStateBackend@5ef29704
2022-05-16 20:00:45,105 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2) switched from INITIALIZING to RUNNING.
2022-05-16 20:00:45,201 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2) switched from RUNNING to FINISHED.
2022-05-16 20:00:45,201 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 (7b42630a41eac7ed55a72cdaf1bd29b2).
2022-05-16 20:00:45,202 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, test_emp]], fields=[name, empId, company]) -> Sink: Filesystem (1/1)#0 7b42630a41eac7ed55a72cdaf1bd29b2.
2022-05-16 20:00:45,991 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:10, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=816.640mb (856309094 bytes), taskOffHeapMemory=102.400mb (107374182 bytes), managedMemory=655.360mb (687194777 bytes), networkMemory=51.200mb (53687091 bytes)}, allocationId: 2daf310333aaed54b7f418886b00abb4, jobId: 6e4c753a14baa10dd3f5bc6553c4eb38).
2022-05-16 20:00:45,992 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 6e4c753a14baa10dd3f5bc6553c4eb38 from job leader monitoring.
2022-05-16 20:00:45,992 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 6e4c753a14baa10dd3f5bc6553c4eb38.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
