'Spring Kafka Listener trying to convert String Payload to int

Here is a simple Kafka consumer which takes a keyed message payload. To keep things simple, I have made the key and value both to be String. Similar code works on spring-kafka version 2.2.5.RELEASE.

spring-kafka version: 2.5.13.RELEASE

spring-boot version: 2.5.0

KafkaConfig.java

@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    public DebugRevenueMessageConsumer debugRevenueMessageConsumer() {
        return new DebugRevenueMessageConsumer();
    }
}

DebugRevenueMessageConsumer.java

@RequiredArgsConstructor
@Slf4j
public class DebugRevenueMessageConsumer {

    @KafkaListener(topics = "${kafka.topic.name}")
    void debugRevenueEventListener(@Payload String record,
                                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        log.debug(String.format("Received message %s, partition: %s", record, partition));
    }
}

application.yml:

spring:
  kafka:
    properties:
      ssl.endpoint.identification.algorithm: https
      security.protocol: 'SASL_SSL'
      sasl.mechanism: 'PLAIN'
      retry.backoff.ms: 500
    jaas:
      enabled: true
      control-flag: required
      login-module: ${kafka.jaas.login-module}
    listener:
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 1
    consumer:
      bootstrap-servers: ${kafka.server}
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 500
      fetch-max-wait: 100
      max-poll-records: 1
      properties:
        session.timeout.ms: 120000
        heartbeat.interval.ms: 30000
        interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
        confluent.monitoring.interceptor:
          bootstrap.servers: ${kafka.server}
          security.protocol: SASL_SSL
          sasl:
            mechanism: PLAIN
            jaas.config: ${kafka.jaas.config}

kafka:
  topic:
    name: "debug.revenue"
    id: "debug.revenue"
    group-id: "debug.revenue-group"
  server: "pkc-lz2dl.ap-southeast-2.aws.confluent.cloud:9092"
  username: W**************
  password: *****************************
  jaas:
    login-module: org.apache.kafka.common.security.plain.PlainLoginModule
    config: ${kafka.jaas.login-module} required username="${kafka.username}" password="${kafka.password}";

Produced message: key: 1 value: "Message 0001"

Kafka Listener tries to convert the payload to int -

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [void au.com.fsp.revenue.consumer.DebugRevenueMessageConsumer.debugRevenueEventListener(java.lang.String,int)]
Bean [au.com.fsp.revenue.consumer.DebugRevenueMessageConsumer@7cccee14]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging"", failedMessage=GenericMessage [payload="Test Message for debugging", headers={kafka_offset=5, kafka_consumer=brave.kafka.clients.TracingConsumer@5b8a68f8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=2, kafka_receivedMessageKey="01", kafka_receivedTopic=debug.revenue, kafka_receivedTimestamp=1648947488475, kafka_acknowledgment=Acknowledgment for debug.revenue-2@5, kafka_groupId=debug-revenue}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging"", failedMessage=GenericMessage [payload="Test Message for debugging", headers={kafka_offset=5, kafka_consumer=brave.kafka.clients.TracingConsumer@5b8a68f8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=2, kafka_receivedMessageKey="01", kafka_receivedTopic=debug.revenue, kafka_receivedTimestamp=1648947488475, kafka_acknowledgment=Acknowledgment for debug.revenue-2@5, kafka_groupId=debug-revenue}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2679)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2649)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2609)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2536)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2427)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2305)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1364)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1355)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1247)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging"", failedMessage=GenericMessage [payload="Test Message for debugging", headers={kafka_offset=5, kafka_consumer=brave.kafka.clients.TracingConsumer@5b8a68f8, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=2, kafka_receivedMessageKey="01", kafka_receivedTopic=debug.revenue, kafka_receivedTimestamp=1648947488475, kafka_acknowledgment=Acknowledgment for debug.revenue-2@5, kafka_groupId=debug-revenue}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:352)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:92)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:53)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter$$FastClassBySpringCGLIB$$cde8c01d.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.cloud.sleuth.brave.instrument.messaging.MessageListenerMethodInterceptor.invoke(MessageListenerMethodInterceptor.java:58)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter$$EnhancerBySpringCGLIB$$f96cc241.onMessage(<generated>)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2629)
    ... 11 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload '"Test Message for debugging"' to 'int'; nested exception is org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging""
    at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:70)
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:141)
    at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:347)
    ... 24 common frames omitted
Caused by: org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [int] for value '"Test Message for debugging"'; nested exception is java.lang.NumberFormatException: For input string: ""TestMessagefordebugging""
    at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:47)
    at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:192)
    at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:175)
    at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66)
    ... 31 common frames omitted
Caused by: java.lang.NumberFormatException: For input string: ""TestMessagefordebugging""
    at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.base/java.lang.Integer.parseInt(Integer.java:638)
    at java.base/java.lang.Integer.valueOf(Integer.java:983)
    at org.springframework.util.NumberUtils.parseNumber(NumberUtils.java:211)
    at org.springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:64)
    at org.springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:50)
    at org.springframework.core.convert.support.GenericConversionService$ConverterFactoryAdapter.convert(GenericConversionService.java:437)
    at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41)
    ... 34 common frames omitted

Why is spring-kafka trying to convert the message to int?

EDIT: I edited the original question to make the code as simple as possible so, changed the payload value from a custom class to a String.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source