'Kafka testing MessageListener not working

I want to test my kafka producer class so I created a very basic one, only sending string to a given topic. The code of config is default since the properties are loaded from the application.yml file

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, String> kafkaStringTemplate;

    public void sendString(String topic, String payload) {
        ListenableFuture<SendResult<String, String>> future = kafkaStringTemplate.send(topic, payload);

        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                log.warn("Error sending {{}} to {{}}", payload, topic, ex);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                if (result != null) {
                    RecordMetadata recordMetadata = result.getRecordMetadata();
                    log.info("topic = {{}}, partition = {{}}, offset = {{}}, payload = {{}}",
                            recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), payload);
                }
            }
        });
    }
}

To test it I decided to use EmbeddedKafkaRule and create a consumer that listens for new messages:

@Testcontainers
@SpringBootTest
@DirtiesContext
@EmbeddedKafka
@AutoConfigureMockMvc
class MyTest {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "topic");

    @Autowired
    KafkaProducer kafkaProducer;

    private KafkaMessageListenerContainer<String, String> container;

    private BlockingQueue<ConsumerRecord<String, String>> records;

    @BeforeAll
    static void setUp() {
        embeddedKafka.before();
    }

    @BeforeEach
    @SneakyThrows
    public void setUp() {
        Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("group", "false", embeddedKafka.getEmbeddedKafka());
        //consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

        ContainerProperties containerProperties = new ContainerProperties("topic");

        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        records = new LinkedBlockingQueue<>();

        container.setupMessageListener((MessageListener<String, String>) record -> {
            System.out.println("test-listener received message= " + record);
            records.add(record);
        });

        container.start();

        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
    }


    @AfterAll
    static void tearDown() {
        embeddedKafka.after();
    }

    @AfterEach
    public void tearDown() {
        container.stop();
    }

    @Test
    @SneakyThrows
    public void testSend() {
        String op = "SOMEOP";

        System.out.println("SENDING");
        kafkaProducer.sendString("topic", op);
        System.out.println("SENT");
        System.out.println("POLLING...");
        ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);
        System.out.println("POLL");

        String r = received.toString();

        System.out.println(r);
    }
}

And this test/resource/application.qml that handles the configuration:

  kafka:
    bootstrap-servers: ${spring.embedded.kafka.brokers}
    consumer:
      enabled: ${KAFKA_CONSUMER_ENABLED:true}
      auto-offset-reset: ${AUTO_OFFSET_RESET:earliest}
      group-id: ${CONSUMER_GROUP_ID:example-group}
      key-deserializer: ${KAFKA_KEY_DESERIALIZER_CLASS:org.apache.kafka.common.serialization.StringDeserializer}
      value-deserializer: ${KAFKA_VALUE_DESERIALIZER_CLASS:org.springframework.kafka.support.serializer.JsonDeserializer}
    properties:
      # Delegate deserializers
      spring.deserializer.key.delegate.class: ${KAFKA_KEY_DESERIALIZER_CLASS:org.apache.kafka.common.serialization.StringDeserializer}
      spring.deserializer.value.delegate.class: ${KAFKA_VALUE_DESERIALIZER_CLASS:org.springframework.kafka.support.serializer.JsonDeserializer}
      spring.json.trusted.packages: ${KAFKA_JSON_TRUSTED_PACKAGES:*}

    producer:
      enabled: ${KAFKA_PRODUCER_ENABLED:true}
      #key-serializer: ${KAFKA_KEY_SERIALIZER_CLASS:org.apache.kafka.common.serialization.StringSerializer}
      #value-serializer: ${KAFKA_VALUE_SERIALIZER_CLASS:org.springframework.kafka.support.serializer.JsonSerializer}

The onSuccess method of KafkaProducer future is launching, indicating that it correctly sent data to kafka partition.

The thing is that MessageListener never fires and because of that the received variable is null. What needs to be corrected?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source