'Skip message in Kafka deserialization schema if any problems occur
I have a simple Apache Flink job that ends with a Kafka sink. I'm using a KafkaRecordSerializationSchema<CustomType> to handle the message from the previous (RichFlatMap) operator:
public final class CustomTypeSerializationSchema implements KafkaRecordSerializationSchema<CustomType> {
private static final long serialVersionUID = 5743933755381724692L;
private final String topic;
public CustomTypeSerializationSchema(final String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(final CustomType input, final KafkaSinkContext context,
final Long timestamp) {
final var result = new CustomMessage(input);
try {
return new ProducerRecord<>(topic,
JacksonJsonMapper.writeValueAsString(result).getBytes(StandardCharsets.UTF_8));
} catch (final Exception e) {
logger.warn("Unable to serialize message [{}]. This was the reason:", result, e);
}
return new ProducerRecord<>(topic, new byte[0]);
}
}
The problem I'm trying to avoid is to send an "empty" ProducerRecord — like the one that will be executed by default if something happens within the try-catch. Basically, I'm looking for a behavior similar to KafkaRecordDeserializationSchema, where what's put in the collector is what's going to be received in subsequent operators, and the rest is discarded.
Is there a way to achieve this with another *SerializationSchema type?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
