'Embedded Kafka contract tests randomly failing
I am using the spring cloud contrat to test a kafkaListsener of my service. I use EmbeddedKafka together with the spring cloud contract.
The listener in my service is implemeneted by @KafkaListener from spring kafka.
The contract test I have is like below:
@EmbeddedKafka
@SpringBootTest(classes = {ServiceApplication.class},
properties = {"stubrunner.kafka.enabled=true",
"stubrunner.stream.enabled=false",
"spring.cloud.stream.function.autodetect=false"})
public class EventContractTest {
@Autowired
private StubTrigger stubTrigger;
@SpyBean
@Qualifier("KafkaEventListener")
private EventListener eventListener;
@BeforeEach
public void setup() throws ExecutionException, InterruptedException {
Mockito.doNothing().when(eventListener).onEventReceived(any(), any());
}
@Test
public void kafkaEventTest() throws ExecutionException, InterruptedException {
stubTrigger.trigger("kafka-event");
ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
verify(eventListener, timeout(5000).times(1)).
onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
....
}
@Test
public void kafkaEventTest2() throws ExecutionException, InterruptedException {
stubTrigger.trigger("kafka-event-2");
ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
verify(eventListener, timeout(5000).times(1)).
onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
....
}
}
when run the tests case, in most cases it will pass, but it will randomlly fail with the following exception:
[2022-05-06T09:53:52.883Z] [ERROR] kafkaEventTest Time elapsed: 30.177 s <<< ERROR! [2022-05-06T09:53:52.883Z] java.lang.IllegalStateException: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.sap.billing.bill.contracts.EventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z] Caused by: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.sap.billing.bill.contracts.EventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z]
[2022-05-06T09:53:52.883Z] [ERROR] Tests run: 4, Failures: 1, Errors: 1, Skipped: 0, Time elapsed: 212.675 s <<< FAILURE! - in com.contracts.eventContractTest [2022-05-06T09:53:52.883Z] [ERROR] kafkaEventTest Time elapsed: 30.177 s <<< ERROR! [2022-05-06T09:53:52.883Z] java.lang.IllegalStateException: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.contracts.eventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z] Caused by: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.contracts.eventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z] [ERROR] kafkaEventTest2 Time elapsed: 11.191 s <<< FAILURE! [2022-05-06T09:53:52.883Z] org.mockito.exceptions.verification.TooManyActualInvocations: [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z] EventListener.onEventReceived( [2022-05-06T09:53:52.883Z]
, [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z] ); [2022-05-06T09:53:52.883Z] Wanted 1 time: [2022-05-06T09:53:52.883Z] -> at com.messaging.kafka.listener.EventListener.onEventReceived(EventListener.java:49) [2022-05-06T09:53:52.883Z] But was 2 times: [2022-05-06T09:53:52.883Z] -> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2022-05-06T09:53:52.883Z] -> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z]
And from the log it is throw by the stubtrigger.trigger() method. Is this a bug of the embedded kafka?
As a result, the first test case is failed due to the timeout exception. the second test case failed as well due to the listener method is consumed twice already, which means the event triggered by the first case is already consumed.
Solution 1:[1]
To test the integration, I previously performed the following steps in order and got the answer:
step 1:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.2.6.RELEASE</version>
<scope>test</scope>
</dependency>
step 2:
@Component
public class KafkaProducer {
private KafkaTemplate<String,String> kafkaTemplate;
public void send(String topic,String payload){
kafkaTemplate.send( topic,payload );
}
}
step 3:
@Component
public class KafkaConsumer {
String payload;
@KafkaListener(topics ="test-topic",groupId = "test")
public void receive(ConsumerRecord<?,?> consumerRecord){
setPayload(consumerRecord.toString());
}
public void setPayload(String p){
payload=p;
}
public String getPayload(){
return payload;
}
}
step 4:
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9094", "port" +
"=9094" })
public class EmbeddedKafkaIntegrationTest {
@Autowired
private KafkaConsumer consumer;
@Autowired
private KafkaProducer producer;
private String topic="test-topic";
@Test
public void test_integration ()
throws Exception {
producer.send(topic, "Sending for test integration");
Assert.assertTrue(consumer.getPayload().contains( "test-topic" ));
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Ali Mohammadi |
