'How to unit test flink timers using mockito
I have a process funtion, im using mockito for unit testing.
class SampleProcessFunction : KeyedProcessFunction<String, String, String>() {{
override fun processElement(dm: String, ctx: Context, out: Collector<String>) {
var retrieveState = state.value()
// schedule the next timer from the current event time
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000)
}
override fun onTimer(scheduledTimer: Long, ctx: OnTimerContext?, out: Collector<String>) {
val retrieveState = state.value()
if(scheduledTimer == (ctx.timestamp()+3000)) {
out.collect(presenceEvent)
}
}
}
I wanted to unit test the process function using mockito
internal class PresenceDetectionProcessFunctionTest {
private lateinit var out: Collector<String>
private lateinit var mockPresenceState: ValueState<Presence>
@BeforeEach
fun setUp() {
SampleProcessFunction = SampleProcessFunction()
out = mock(Collector::class.java) as Collector<PresenceEvent>
val mockRunTimeContext = mock(RuntimeContext::class.java)
mockSampleState = mock(ValueState::class.java) as ValueState<SampleState>
whenever(mockRunTimeContext.getState(ValueStateDescriptor("sample-state",
Sample::class.java))).thenReturn(mockSampleState)
sampleProcessFunction.runtimeContext = mockRunTimeContext
sampleProcessFunction.open(Configuration())
}
@Test
fun `test sample process function`() {
sampleProcessFunction.processElement("Hellp, context, out)
Mockito.verify(mockPresenceState, times(1)).update(result.capture())
}
How do we test the timer here, in my process fun i have register the event timer, but when I debug throws NPE at ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 3000) in sample process function as I did not setup any test configuration for timers in my test. How do we test the flink timer functionality here? Any suggestions?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
