'How to aggregate the elements in the window sessions flink?

I'm using the flink Session windows when it does not receive elements for a certain period of time,i.e; when a gap of inactivity occurred it should emit an event.

I configured the gap as 10 seconds in the flink job. And I sent the event1 and sends event2 after 5 seconds. These two events should belong to first window. The output should be an aggregate of these two events. But I am getting only the first event.

below is the code I tried:

    fun setupJob(env: StreamExecutionEnvironment) {

        val testStream = env.sampleStream()
                    .keyBy { it.f0 }
                    .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                   .process(MyProcessWindowFunction())
    
            testStream.map { it.toKafkaMessage() }
                    .kafkaSink<SampleOutput>() }
}

then MyProcessWindowFunction looks like

class MyProcessWindowFunction : ProcessWindowFunction<Tuple4<String, inputA?, inputB?, inputC?>, Tuple2<String, SampleOutput?>,
            String, TimeWindow>() {
    

    private lateinit var sampleOutputState: ValueState<SampleOutputState>

    override fun open(parameters: Configuration) {
        val SampleOutputStateDescriptor = ValueStateDescriptor("sample-output-state", SampleOutputState::class.java)
        SampleOutputState = runtimeContext.getState(SampleOutputStateDescriptor)
    }

    override fun process(key: String, context: Context, elements: MutableIterable<Tuple4<String, inputA?, inputB?, inputC?>, out: Collector<Tuple2<String, SampleOutput?>>) {
        val current = sampleOutputState.value()

        val value = elements.iterator().next()

        val latestState = when {
            value.f2 != null -> processCondition(value.f2!!, current)
            else -> return
        }
        sampleOutputState.update(latestState)
        out.collect(Tuple2(key, latestState))
    }

    private fun processInputB(inputB: InputB, currentState: SampleOutputState?): SampleOutputState {
        return currentState?.copy(
                timestamp = System.currentTimeMillis(),
                eventTime = condition.eventTime,

        )  ?: 
        createInputBState(inputB)
    }

    private fun createInputBState(inputB: InputB): SampleOutputState = SampleOutputState(
            id = UUID.randomUUID().toString(),
            timestamp = System.currentTimeMillis(),
            eventTime = condition.eventTime,
          
    )

}

I'm getting the only event1 but I wanted to get the aggregate of both the events (that I sent event1 and event2).

How do we get the aggregate of the events that are available with in a session ?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source