'Kafka KStream: Convert Value to Long
The content of my Stream looks like the following (output created with a python script):
ConsumerRecord(topic='streams-wordcount-output', partition=0, offset=1222, timestamp=1652882637137, timestamp_type=0, key=b'auto', value=b'\x00\x00\x00\x00\x00\x00\x01\x9c', headers=[], checksum=None, serialized_key_size=4, serialized_value_size=8, serialized_header_size=-1)
ConsumerRecord(topic='streams-wordcount-output', partition=0, offset=1223, timestamp=1652882637137, timestamp_type=0, key=b'mann', value=b'\x00\x00\x00\x00\x00\x00\x00\xc6', headers=[], checksum=None, serialized_key_size=4, serialized_value_size=8, serialized_header_size=-1)
ConsumerRecord(topic='streams-wordcount-output', partition=0, offset=1224, timestamp=1652882637137, timestamp_type=0, key=b'new', value=b'\x00\x00\x00\x00\x00\x00\x02b', headers=[], checksum=None, serialized_key_size=3, serialized_value_size=8, serialized_header_size=-1)
Now, I want to convert the value to a Java Long. I tried things like .mapValues(v -> Long.decode(v)) but it doesn't work (I do not get any output). How can I convert the value to a numeric number?
The code I use looks like the following:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("streams-wordcount-output");
var wordCounts = textLines
.filter((x,y)-> x.contentEquals("new"))
.toTable();
wordCounts.toStream().print(Printed.toSysOut());
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Thread.sleep(4000);
streams.close();
Output:
[KTABLE-TOSTREAM-0000000004]: new, _
[KTABLE-TOSTREAM-0000000004]: new, `
This doesn't work:
var wordCounts = textLines
.mapValues(v -> Long.decode(v))
.filter((x,y)-> x.contentEquals("new"))
.toTable();
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
