'how to stop a flink window job over a data stream?
I have a program in which a producer creates events and send them to a Kafka topic. Then, I create a data stream over this topic and a flink sliding window process this data stream. In case the number of events in each window goes below a threshold, the program needs to change the producer to another one. Now, when this happen, I want to stop this flink window job in the code and start a new window over the same data stream which now consumes events from the new producer. My code is as below:
dataStream
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(window_size), Time.seconds(1)))
.process(new ProcessAllWindowFunction<DataEvent, String, TimeWindow>() {
@Override
public void process(Context context, Iterable<DataEvent> input,
Collector<String> out) throws Exception {
int count = 0;
for (DataEvent de : input) {
count++;
}
if (Controller.offset < query.getDuration()) {
out.collect("Window: " + context.window() + "count: " + count);
if (Controller.offset > window_size){
//do something
}
}
}
}
});
If I stop the producer to not send the data to the Kafka topic, I receive this error:
java.lang.IllegalStateException: Cannot perform operation after producer has been closed
at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:896)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:905)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:889)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:775)
at FlinkCEP01.Producer.producerStart(Producer.java:235)
at FlinkCEP01.Controller$1.doInBackground(Controller.java:262)
at javax.swing.SwingWorker$1.call(SwingWorker.java:295)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at javax.swing.SwingWorker.run(SwingWorker.java:334)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
My question is how can I stop the window analysis over the data stream?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
