'Kafka testing MessageListener not working
I want to test my kafka producer class so I created a very basic one, only sending string to a given topic. The code of config is default since the properties are loaded from the application.yml file
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaStringTemplate;
public void sendString(String topic, String payload) {
ListenableFuture<SendResult<String, String>> future = kafkaStringTemplate.send(topic, payload);
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
log.warn("Error sending {{}} to {{}}", payload, topic, ex);
}
@Override
public void onSuccess(SendResult<String, String> result) {
if (result != null) {
RecordMetadata recordMetadata = result.getRecordMetadata();
log.info("topic = {{}}, partition = {{}}, offset = {{}}, payload = {{}}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), payload);
}
}
});
}
}
To test it I decided to use EmbeddedKafkaRule and create a consumer that listens for new messages:
@Testcontainers
@SpringBootTest
@DirtiesContext
@EmbeddedKafka
@AutoConfigureMockMvc
class MyTest {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "topic");
@Autowired
KafkaProducer kafkaProducer;
private KafkaMessageListenerContainer<String, String> container;
private BlockingQueue<ConsumerRecord<String, String>> records;
@BeforeAll
static void setUp() {
embeddedKafka.before();
}
@BeforeEach
@SneakyThrows
public void setUp() {
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps("group", "false", embeddedKafka.getEmbeddedKafka());
//consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
ContainerProperties containerProperties = new ContainerProperties("topic");
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
System.out.println("test-listener received message= " + record);
records.add(record);
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
}
@AfterAll
static void tearDown() {
embeddedKafka.after();
}
@AfterEach
public void tearDown() {
container.stop();
}
@Test
@SneakyThrows
public void testSend() {
String op = "SOMEOP";
System.out.println("SENDING");
kafkaProducer.sendString("topic", op);
System.out.println("SENT");
System.out.println("POLLING...");
ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);
System.out.println("POLL");
String r = received.toString();
System.out.println(r);
}
}
And this test/resource/application.qml that handles the configuration:
kafka:
bootstrap-servers: ${spring.embedded.kafka.brokers}
consumer:
enabled: ${KAFKA_CONSUMER_ENABLED:true}
auto-offset-reset: ${AUTO_OFFSET_RESET:earliest}
group-id: ${CONSUMER_GROUP_ID:example-group}
key-deserializer: ${KAFKA_KEY_DESERIALIZER_CLASS:org.apache.kafka.common.serialization.StringDeserializer}
value-deserializer: ${KAFKA_VALUE_DESERIALIZER_CLASS:org.springframework.kafka.support.serializer.JsonDeserializer}
properties:
# Delegate deserializers
spring.deserializer.key.delegate.class: ${KAFKA_KEY_DESERIALIZER_CLASS:org.apache.kafka.common.serialization.StringDeserializer}
spring.deserializer.value.delegate.class: ${KAFKA_VALUE_DESERIALIZER_CLASS:org.springframework.kafka.support.serializer.JsonDeserializer}
spring.json.trusted.packages: ${KAFKA_JSON_TRUSTED_PACKAGES:*}
producer:
enabled: ${KAFKA_PRODUCER_ENABLED:true}
#key-serializer: ${KAFKA_KEY_SERIALIZER_CLASS:org.apache.kafka.common.serialization.StringSerializer}
#value-serializer: ${KAFKA_VALUE_SERIALIZER_CLASS:org.springframework.kafka.support.serializer.JsonSerializer}
The onSuccess method of KafkaProducer future is launching, indicating that it correctly sent data to kafka partition.
The thing is that MessageListener never fires and because of that the received variable is null. What needs to be corrected?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
