'Kafka Streams WindowStore build error - Serdes<String> incompatible with Serdes<Object>
Im trying to use the aggregate function for a Kafka Stream tumbling [window][1]. However, I'm getting this strange build error.
This is how my code looks like -
kStreamBuilder.stream(topic, Consumed.with(STRING_SERDE, MYBEAN_SERDE))
.groupBy((key, value) -> value.id(), Grouped.with(
Serdes.String(),
MYBEAN_SERDE))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate( Aggregration::new,(key, value, aggregration) ->
{
aggregration.count++;
return aggregration;
},Materialized.as(Stores.persistentWindowStore("asd",Duration.ofMinutes(1), Duration.ofMinutes(1),false))
.withKeySerde(STRING_SERDE).withValueSerde(AGGREGRATION_SERDE)
);
This is the error I'm getting in the ".withKeySerde(STRING_SERDE)" part -
java: incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.String> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>
Solution 1:[1]
Found the solution here - Kafka Streams Materialized Store Build Error
Here is the updated code -
kStreamBuilder.stream(topic, Consumed.with(STRING_SERDE, MYBEAN_SERDE))
.groupBy((key, value) -> value.id(), Grouped.with(
Serdes.String(),
MYBEAN_SERDE))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate( Aggregration::new,(key, value, aggregration) ->
{
aggregration.count++;
return aggregration;
},Materialized.<String,Aggregration>as(Stores.persistentWindowStore("asd",Duration.ofMinutes(1), Duration.ofMinutes(1),false))
.withKeySerde(STRING_SERDE).withValueSerde(AGGREGRATION_SERDE)
);
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | marc |
