'Skip message in Kafka deserialization schema if any problems occur

I have a simple Apache Flink job that ends with a Kafka sink. I'm using a KafkaRecordSerializationSchema<CustomType> to handle the message from the previous (RichFlatMap) operator:

public final class CustomTypeSerializationSchema implements KafkaRecordSerializationSchema<CustomType> {
  private static final long serialVersionUID = 5743933755381724692L;

  private final String topic;

  public CustomTypeSerializationSchema(final String topic) {
    this.topic = topic;
  }

  @Override
  public ProducerRecord<byte[], byte[]> serialize(final CustomType input, final KafkaSinkContext context,
      final Long timestamp) {
    final var result = new CustomMessage(input);
    try {
      return new ProducerRecord<>(topic,
          JacksonJsonMapper.writeValueAsString(result).getBytes(StandardCharsets.UTF_8));
    } catch (final Exception e) {
      logger.warn("Unable to serialize message [{}]. This was the reason:", result, e);
    }
    return new ProducerRecord<>(topic, new byte[0]);
  }
}

The problem I'm trying to avoid is to send an "empty" ProducerRecord — like the one that will be executed by default if something happens within the try-catch. Basically, I'm looking for a behavior similar to KafkaRecordDeserializationSchema, where what's put in the collector is what's going to be received in subsequent operators, and the rest is discarded.

Is there a way to achieve this with another *SerializationSchema type?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source