'Embedded Kafka contract tests randomly failing

I am using the spring cloud contrat to test a kafkaListsener of my service. I use EmbeddedKafka together with the spring cloud contract.

The listener in my service is implemeneted by @KafkaListener from spring kafka.

The contract test I have is like below:

    
@EmbeddedKafka
@SpringBootTest(classes = {ServiceApplication.class},
        properties = {"stubrunner.kafka.enabled=true",
                "stubrunner.stream.enabled=false",
                "spring.cloud.stream.function.autodetect=false"})
public class EventContractTest {

    @Autowired
    private StubTrigger stubTrigger;

    @SpyBean
    @Qualifier("KafkaEventListener")
    private EventListener eventListener;

    @BeforeEach
    public void setup() throws ExecutionException, InterruptedException {   
        Mockito.doNothing().when(eventListener).onEventReceived(any(), any());
    }
    
    @Test
    public void kafkaEventTest() throws ExecutionException, InterruptedException {
        stubTrigger.trigger("kafka-event");

        ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
        ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
        verify(eventListener, timeout(5000).times(1)).
                onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
  ....
    }

   @Test
    public void kafkaEventTest2() throws ExecutionException, InterruptedException {
        stubTrigger.trigger("kafka-event-2");

        ArgumentCaptor<Event> eventArgumentCaptor = ArgumentCaptor.forClass(Event.class);
        ArgumentCaptor<Bytes> headerArgumentCaptor = ArgumentCaptor.forClass(Bytes.class);
        verify(eventListener, timeout(5000).times(1)).
                onEventReceived(eventArgumentCaptor.capture(), headerArgumentCaptor.capture());
    ....
    }
}

when run the tests case, in most cases it will pass, but it will randomlly fail with the following exception:

[2022-05-06T09:53:52.883Z] [ERROR] kafkaEventTest Time elapsed: 30.177 s <<< ERROR! [2022-05-06T09:53:52.883Z] java.lang.IllegalStateException: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.sap.billing.bill.contracts.EventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z] Caused by: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.sap.billing.bill.contracts.EventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z]

[2022-05-06T09:53:52.883Z] [ERROR] Tests run: 4, Failures: 1, Errors: 1, Skipped: 0, Time elapsed: 212.675 s <<< FAILURE! - in com.contracts.eventContractTest [2022-05-06T09:53:52.883Z] [ERROR] kafkaEventTest Time elapsed: 30.177 s <<< ERROR! [2022-05-06T09:53:52.883Z] java.lang.IllegalStateException: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.contracts.eventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z] Caused by: java.util.concurrent.TimeoutException [2022-05-06T09:53:52.883Z] at com.contracts.eventContractTest.kafkaEventTest(EventContractTest.java:122) [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z] [ERROR] kafkaEventTest2 Time elapsed: 11.191 s <<< FAILURE! [2022-05-06T09:53:52.883Z] org.mockito.exceptions.verification.TooManyActualInvocations: [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z] EventListener.onEventReceived( [2022-05-06T09:53:52.883Z]
, [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z] ); [2022-05-06T09:53:52.883Z] Wanted 1 time: [2022-05-06T09:53:52.883Z] -> at com.messaging.kafka.listener.EventListener.onEventReceived(EventListener.java:49) [2022-05-06T09:53:52.883Z] But was 2 times: [2022-05-06T09:53:52.883Z] -> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2022-05-06T09:53:52.883Z] -> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [2022-05-06T09:53:52.883Z] [2022-05-06T09:53:52.883Z]

And from the log it is throw by the stubtrigger.trigger() method. Is this a bug of the embedded kafka?

As a result, the first test case is failed due to the timeout exception. the second test case failed as well due to the listener method is consumed twice already, which means the event triggered by the first case is already consumed.



Solution 1:[1]

To test the integration, I previously performed the following steps in order and got the answer:

step 1:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.2.6.RELEASE</version>
    <scope>test</scope>
</dependency>

step 2:

@Component
public class KafkaProducer {
  private KafkaTemplate<String,String> kafkaTemplate;
  public void send(String topic,String payload){
    kafkaTemplate.send( topic,payload );
  }
}

step 3:

@Component
public class KafkaConsumer {
  String payload;
  @KafkaListener(topics ="test-topic",groupId = "test")
  public void receive(ConsumerRecord<?,?> consumerRecord){
  setPayload(consumerRecord.toString());
  }
  public void setPayload(String p){
    payload=p;
  }
  public String getPayload(){
    return payload;
  }
  
}

step 4:

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9094", "port" +
"=9094" })
public class EmbeddedKafkaIntegrationTest {
  @Autowired
  private KafkaConsumer consumer;
  @Autowired
  private KafkaProducer producer;
  private String topic="test-topic";
  @Test
  public void test_integration ()
  throws Exception {
    producer.send(topic, "Sending for test integration");
    Assert.assertTrue(consumer.getPayload().contains( "test-topic" ));
  }
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Ali Mohammadi