'Spark analog of Flink timers
I have a stream of events with id (2 events per one ID with START, STOP status) and I want to make alarms if the STOP event is not sent in some period of time. Is it possible to use timers in Spark Structure Streaming and process functions like in Flink. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/process_function/
@Override
public void processElement(
Tuple2<String, String> value,
Context ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count++;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
