'Anyone faced this this `Timestamp should always be non-negative or null.` error while publishing events to a kafka topic from flink app?
Getting this error while publishing messages to a kafka topic from a flink app. The same code is working in our test environment with similar producer configuration but failing in production environment. I couldn't find the reason for this issue.
2022-04-15 16:51:37,228 thread="Sink: spend-limit-publisher-sink (2/2)#15" level=WARN logger=o.a.f.r.t.Task - Sink: spend-limit-publisher-sink (2/2)#15 (d7b4646c840c3882bb784125393b484a) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:80)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:72)
at com.ex******.streamplatform.sdk.kafka.flink.KafkaSerializationSchemaAdapter.serialize(KafkaSerializationSchemaAdapter.java:37)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
Solution 1:[1]
I figured out the issue was because of lack of watermark at the sink. So the timestamp was defaulting to Long.MIN_VALUE (-9223372036854775808)
Fixed it by adding watermark
limitedStream1.union(limitedStream2)
.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps()
.withTimestampAssigner((id, streamRecordTimestamp) -> Instant.now().toEpochMilli()))
.map(new DataMapper())
.filter(new NotNull<>())
.addSink(connectorFactory.createProducer(String.class, EventRecord.class));
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | SanS |
