'Kafka Streams: send only the first message within a window downstream

I am trying to implement data deduplication using Kafka streams. Basically, I'd like to drop any duplicates after the first encountered message in a session window with a 1-second size and an 8-hour grace period for late arrivals.

A more concrete example:

Input:

Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:22.555 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.574 }
Key: B2; Value: { sensor: B2, current: 42, timestamp: Fri Apr 15 2022 21:59:24.732 }

Desired output:

Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:22.555 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: B2; Value: { sensor: B2, current: 52, timestamp: Fri Apr 15 2022 21:59:24.732 }

so the third message

{ sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.574 }

from the input stream should be dropped since the sensor and current fields are matching what we already have in a defined 1 second window

Here's sample code:

    streamsBuilder
      .stream(
        "input-topic",
        Consumed.with(Serdes.String(), telemetrySerde)
          // set AVRO record as ingestion timestamp
          .withTimestampExtractor(ValueTimestampExtractor()) 
      )
      .groupBy(
        { _, value ->
          TelemetryDuplicate(
            sensor = value.sensor,
            current = value.current
          )
        },
        Grouped.with(telemetryDuplicateSerde, telemetrySerde)
      )
      .windowedBy(
        SessionWindows.ofInactivityGapAndGrace(
          /* inactivityGap = */ Duration.ofSeconds(1),
          /* afterWindowEnd = */ Duration.ofHours(8)
        )
      )
      // always keep only the first record of the group
      .reduce { value, _ -> value } 
      .toStream()
      .selectKey { k, _ -> k.key().sensor }
      .to("output-topic", Produced.with(Serdes.String(), telemetrySerde))

it is actually working and it does the job, HOWEVER, despite that I rekey the resulting stream from windowed to just sensor id, I have the following messages in the output-topic:

Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:22.555 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: A1; Value: NULL
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: B2; Value: { sensor: B2, current: 42, timestamp: Fri Apr 15 2022 21:59:24.732 }

That means that the stream is actually de-duplicated, but in a quite awkward way: due to the change in the session window it produces a tombstone that cancels the previous aggregation despite that neither the selected key nor the value are changed (see how reduce is defined).

The question: is it possible to somehow produce only the first encountered record in the window and not produce any tombstones and "updated" aggregations?

Cheers.



Solution 1:[1]

You can add a filter to remove the null values from being produced to your result topic:

...
.selectKey { k, _ -> k.key().sensor }
.filter { _, value -> value != null}
.to("output-topic", Produced.with(Serdes.String(), telemetrySerde))

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 groo