'Overriding KStreams default serializer (ByteArraySerializer)
I can't seem to override the serializer of a topic to Serdes.String(). I'm trying a simple use case of reading from a topic (stream), and writing to a KTable. What I have so far:
@Component
class Processor {
@Autowired
public void process(final StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
builder.stream("input_topic", Consumed.with(stringSerde, stringSerde))
.filter((key, value) -> value.contains("ACTION"))
.toTable(Materialized.as("output_table_materialized"))
.toStream().to("output_table", Produced.with(stringSerde, stringSerde)); // EDIT: added this last line
}
}
The exception I get is:
org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
From what I gather, it understands the message is a String but it's using the default deserializer ByteArraySerializer. Where am I going wrong in the above code?
Solution 1:[1]
I faced a similar issue and the solution was to specify the serdes on the Materialized instance, i.e. swapping
.toTable(Materialized.as("output_table_materialized"))
with
.toTable(Materialized.as("output_table_materialized").withKeySerde(stringSerde).withValueSerde(stringSerde))
Solution 2:[2]
The Consumed.with would be a Deserializer.
The error is on the Serializer, or the toTable call, which you may add Produced.with or modify your application properties to configure the defaults there
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Logan May |
| Solution 2 | OneCricketeer |
