'Flink KafkaSource Record is corrupt error

i am getting an error on one of my jobs after there have been some changes on the Kafka Broker (new release). Not all of my jobs are affected of the problem.

The job is consuming data from a kafka topic processing it and saves the result in an Oracle database.

The error message is the following:

java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:156) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:351) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.11-1.14.4.jar:1.14.4]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_322]
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_322]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_322]
... 1 more
Caused by: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from topic-name-2. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1519) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1374) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1282) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[?:?]
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97) ~[?:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_322]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_322]
... 1 more
Caused by: org.apache.kafka.common.KafkaException: Record batch for partition topic-name-2 at offset 27254832 is invalid, cause: Record is corrupt (stored crc = 341617725, computed crc = 1191115681)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.maybeEnsureValid(Fetcher.java:1432) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1476) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1533) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1374) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676) ~[?:?]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240) ~[?:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[?:?]
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:97) ~[?:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-table_2.11-1.14.4.jar:1.14.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_322]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_322]
... 1 more

As a consumer i am using KafkaSource with a SimpleStringSchema Deserializer:

KafkaSource<String> source = KafkaSource.<String>builder()
            .setProperties(configManager.getConsumerProperties())
            .setTopicPattern(java.util.regex.Pattern.compile(configManager.getConsumerTopic()))
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();

Is it possible to skip such kind of records? Just found an answer for an older version where Flink Kafka Consumer was referred to.

EDIT: After i have changed some consumer parameters the error message changed and maybe giving an indication what might be wrong.

Changed parameters:

isolation.level = read_committed
check.crcs=false

Caused by: org.apache.kafka.common.InvalidRecordException: Found invalid number of record headers -49



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source