'Kafka Streams: send only the first message within a window downstream
I am trying to implement data deduplication using Kafka streams. Basically, I'd like to drop any duplicates after the first encountered message in a session window with a 1-second size and an 8-hour grace period for late arrivals.
A more concrete example:
Input:
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:22.555 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.574 }
Key: B2; Value: { sensor: B2, current: 42, timestamp: Fri Apr 15 2022 21:59:24.732 }
Desired output:
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:22.555 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: B2; Value: { sensor: B2, current: 52, timestamp: Fri Apr 15 2022 21:59:24.732 }
so the third message
{ sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.574 }
from the input stream should be dropped since the sensor and current fields are matching what we already have in a defined 1 second window
Here's sample code:
streamsBuilder
.stream(
"input-topic",
Consumed.with(Serdes.String(), telemetrySerde)
// set AVRO record as ingestion timestamp
.withTimestampExtractor(ValueTimestampExtractor())
)
.groupBy(
{ _, value ->
TelemetryDuplicate(
sensor = value.sensor,
current = value.current
)
},
Grouped.with(telemetryDuplicateSerde, telemetrySerde)
)
.windowedBy(
SessionWindows.ofInactivityGapAndGrace(
/* inactivityGap = */ Duration.ofSeconds(1),
/* afterWindowEnd = */ Duration.ofHours(8)
)
)
// always keep only the first record of the group
.reduce { value, _ -> value }
.toStream()
.selectKey { k, _ -> k.key().sensor }
.to("output-topic", Produced.with(Serdes.String(), telemetrySerde))
it is actually working and it does the job, HOWEVER, despite that I rekey the resulting stream from windowed to just sensor id, I have the following messages in the output-topic:
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:22.555 }
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: A1; Value: NULL
Key: A1; Value: { sensor: A1, current: 42, timestamp: Fri Apr 15 2022 21:59:23.876 }
Key: B2; Value: { sensor: B2, current: 42, timestamp: Fri Apr 15 2022 21:59:24.732 }
That means that the stream is actually de-duplicated, but in a quite awkward way: due to the change in the session window it produces a tombstone that cancels the previous aggregation despite that neither the selected key nor the value are changed (see how reduce is defined).
The question: is it possible to somehow produce only the first encountered record in the window and not produce any tombstones and "updated" aggregations?
Cheers.
Solution 1:[1]
You can add a filter to remove the null values from being produced to your result topic:
...
.selectKey { k, _ -> k.key().sensor }
.filter { _, value -> value != null}
.to("output-topic", Produced.with(Serdes.String(), telemetrySerde))
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | groo |
