'Kafka KStream: Convert Value to Long

The content of my Stream looks like the following (output created with a python script):

ConsumerRecord(topic='streams-wordcount-output', partition=0, offset=1222, timestamp=1652882637137, timestamp_type=0, key=b'auto', value=b'\x00\x00\x00\x00\x00\x00\x01\x9c', headers=[], checksum=None, serialized_key_size=4, serialized_value_size=8, serialized_header_size=-1)
ConsumerRecord(topic='streams-wordcount-output', partition=0, offset=1223, timestamp=1652882637137, timestamp_type=0, key=b'mann', value=b'\x00\x00\x00\x00\x00\x00\x00\xc6', headers=[], checksum=None, serialized_key_size=4, serialized_value_size=8, serialized_header_size=-1)
ConsumerRecord(topic='streams-wordcount-output', partition=0, offset=1224, timestamp=1652882637137, timestamp_type=0, key=b'new', value=b'\x00\x00\x00\x00\x00\x00\x02b', headers=[], checksum=None, serialized_key_size=3, serialized_value_size=8, serialized_header_size=-1)

Now, I want to convert the value to a Java Long. I tried things like .mapValues(v -> Long.decode(v)) but it doesn't work (I do not get any output). How can I convert the value to a numeric number?

The code I use looks like the following:

       StreamsBuilder builder = new StreamsBuilder();
       KStream<String, String> textLines = builder.stream("streams-wordcount-output");
       
       var wordCounts = textLines
               .filter((x,y)-> x.contentEquals("new"))
               .toTable();
           


       wordCounts.toStream().print(Printed.toSysOut());
      
       KafkaStreams streams = new KafkaStreams(builder.build(), props);
       streams.start();
       
       Thread.sleep(4000);
       streams.close();

Output:

[KTABLE-TOSTREAM-0000000004]: new, _
[KTABLE-TOSTREAM-0000000004]: new, `

This doesn't work:

   var wordCounts = textLines
           .mapValues(v -> Long.decode(v))
           .filter((x,y)-> x.contentEquals("new"))
           .toTable();


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source