'How to unit test flink timers using mockito

I have a process funtion, im using mockito for unit testing.

class SampleProcessFunction : KeyedProcessFunction<String, String, String>() {{
        override fun processElement(dm: String, ctx: Context, out: Collector<String>) {
            var retrieveState = state.value()
            // schedule the next timer  from the current event time
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000)
        } 
    
     override fun onTimer(scheduledTimer: Long, ctx: OnTimerContext?, out: Collector<String>) {
            val retrieveState = state.value()
    
            if(scheduledTimer == (ctx.timestamp()+3000)) {
                         out.collect(presenceEvent)
            }
        }
}

I wanted to unit test the process function using mockito

 internal class PresenceDetectionProcessFunctionTest {
    
        private lateinit var out: Collector<String>
        private lateinit var mockPresenceState: ValueState<Presence>
    
    
        @BeforeEach
        fun setUp() {
            SampleProcessFunction = SampleProcessFunction()
            out = mock(Collector::class.java) as Collector<PresenceEvent>
            val mockRunTimeContext = mock(RuntimeContext::class.java)
            mockSampleState = mock(ValueState::class.java) as ValueState<SampleState>
           
    
            whenever(mockRunTimeContext.getState(ValueStateDescriptor("sample-state", 
             Sample::class.java))).thenReturn(mockSampleState)      
            
             sampleProcessFunction.runtimeContext = mockRunTimeContext
            sampleProcessFunction.open(Configuration())
        }
    
    @Test
    fun `test sample process function`() {
       
        sampleProcessFunction.processElement("Hellp, context, out)
        Mockito.verify(mockPresenceState, times(1)).update(result.capture())
       }

How do we test the timer here, in my process fun i have register the event timer, but when I debug throws NPE at ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000) in sample process function as I did not setup any test configuration for timers in my test. How do we test the flink timer functionality here? Any suggestions?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source