'Why sink operation execute multiple times in my flink program?
I have a flink program with source from kafka, and i opened three windowedStream:seconds, minutes,hours.Then sending window result to others by AsyncHttpSink extends RichSinkFunction.But i found that same window,one kafka message, same result may invoke AsyncHttpSink.invoke() function multiple times which aroused my curiosity.Shouldn't it happen just once in same window,one kafka message, same result?
hourOperator.addSink(httpSink(WindowType.h));
minuteOperator.addSink(httpSink(WindowType.m));
secondOperator.addSink(httpSink(WindowType.s));
/**
* http sink
*/
public class AsyncHttpSink extends RichSinkFunction<Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>>> {
public AsyncHttpSink(WindowType windowType) {
this.windowType = windowType;
}
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpAsyncClients.custom()
.build();
httpClient.start();
}
@Override
public void close() throws Exception {
httpClient.close();
}
@Override
public void invoke(Tuple3<String, Long, Map<String, Tuple2<XXX, Object>>> tuple3, Context context) throws Exception {
httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
try {
logger.info("[httpSink]http sink completed.");
} catch (IOException e) {
logger.error("[httpSink]http sink completed. exception:", e);
}
}
@Override
public void failed(Exception ex) {
logger.error("[httpSink]http sink failed.", ex);
}
@Override
public void cancelled() {
logger.info("[httpSink]http sink cancelled.");
}
});
}
}
Solution 1:[1]
If this is a keyed window, then each distinct key that has results for a given window will report its results separately.
And you may have several parallel instances of the sink.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | David Anderson |
