'Spring Kafka Listener trying to convert String Payload to int
Here is a simple Kafka consumer which takes a keyed message payload. To keep things simple, I have made the key and value both to be String. Similar code works on spring-kafka version 2.2.5.RELEASE.
spring-kafka version: 2.5.13.RELEASE
spring-boot version: 2.5.0
KafkaConfig.java
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public DebugRevenueMessageConsumer debugRevenueMessageConsumer() {
return new DebugRevenueMessageConsumer();
}
}
DebugRevenueMessageConsumer.java
@RequiredArgsConstructor
@Slf4j
public class DebugRevenueMessageConsumer {
@KafkaListener(topics = "${kafka.topic.name}")
void debugRevenueEventListener(@Payload String record,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
log.debug(String.format("Received message %s, partition: %s", record, partition));
}
}
application.yml:
spring:
kafka:
properties:
ssl.endpoint.identification.algorithm: https
security.protocol: 'SASL_SSL'
sasl.mechanism: 'PLAIN'
retry.backoff.ms: 500
jaas:
enabled: true
control-flag: required
login-module: ${kafka.jaas.login-module}
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 1
consumer:
bootstrap-servers: ${kafka.server}
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 500
fetch-max-wait: 100
max-poll-records: 1
properties:
session.timeout.ms: 120000
heartbeat.interval.ms: 30000
interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
confluent.monitoring.interceptor:
bootstrap.servers: ${kafka.server}
security.protocol: SASL_SSL
sasl:
mechanism: PLAIN
jaas.config: ${kafka.jaas.config}
kafka:
topic:
name: "debug.revenue"
id: "debug.revenue"
group-id: "debug.revenue-group"
server: "pkc-lz2dl.ap-southeast-2.aws.confluent.cloud:9092"
username: W**************
password: *****************************
jaas:
login-module: org.apache.kafka.common.security.plain.PlainLoginModule
config: ${kafka.jaas.login-module} required username="${kafka.username}" password="${kafka.password}";
Produced message:
key: 1
value: "Message 0001"
Kafka Listener tries to convert the payload to int -
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [void au.com.fsp.revenue.consumer.DebugRevenueMessageConsumer.debugRevenueEventListener(java.lang.String,int)]
Bean [au.com.fsp.revenue.consumer.DebugRevenueMessageConsumer@7cccee14]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging"", failedMessage=GenericMessage [payload="Test Message for debugging", headers={kafka_offset=5, kafka_consumer=brave.kafka.clients.TracingConsumer@5b8a68f8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=2, kafka_receivedMessageKey="01", kafka_receivedTopic=debug.revenue, kafka_receivedTimestamp=1648947488475, kafka_acknowledgment=Acknowledgment for debug.revenue-2@5, kafka_groupId=debug-revenue}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging"", failedMessage=GenericMessage [payload="Test Message for debugging", headers={kafka_offset=5, kafka_consumer=brave.kafka.clients.TracingConsumer@5b8a68f8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=2, kafka_receivedMessageKey="01", kafka_receivedTopic=debug.revenue, kafka_receivedTimestamp=1648947488475, kafka_acknowledgment=Acknowledgment for debug.revenue-2@5, kafka_groupId=debug-revenue}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2679)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2649)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2609)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2536)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2427)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2305)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1364)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1355)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging"", failedMessage=GenericMessage [payload="Test Message for debugging", headers={kafka_offset=5, kafka_consumer=brave.kafka.clients.TracingConsumer@5b8a68f8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=2, kafka_receivedMessageKey="01", kafka_receivedTopic=debug.revenue, kafka_receivedTimestamp=1648947488475, kafka_acknowledgment=Acknowledgment for debug.revenue-2@5, kafka_groupId=debug-revenue}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter$$FastClassBySpringCGLIB$$cde8c01d.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.cloud.sleuth.brave.instrument.messaging.MessageListenerMethodInterceptor.invoke(MessageListenerMethodInterceptor.java:58)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter$$EnhancerBySpringCGLIB$$f96cc241.onMessage(<generated>)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2629)
... 11 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging""
at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:70)
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:141)
at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
... 24 common frames omitted
Caused by: org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging""
at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:47)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:192)
at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:175)
at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)
... 31 common frames omitted
Caused by: java.lang.NumberFormatException: For input string: ""TestMessagefordebugging""
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Integer.parseInt(Integer.java:638)
at java.base/java.lang.Integer.valueOf(Integer.java:983)
at org.springframework.util.NumberUtils.parseNumber(NumberUtils.java:211)
at org.springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:64)
at org.springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:50)
at org.springframework.core.convert.support.GenericConversionService$ConverterFactoryAdapter.convert(GenericConversionService.java:437)
at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41)
... 34 common frames omitted
Why is spring-kafka trying to convert the message to int?
EDIT: I edited the original question to make the code as simple as possible so, changed the payload value from a custom class to a String.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
