'Flink - Need way to notify one stream from another
I have an Apache flink usecase that works as follows:
I have data events coming in through first stream. Part of each event is a foreign key for which I expect data from the second stream. E.g.: I am getting data for major cities in the first stream which has a city-code and I need the average temperature over time for this city code streamed through the second stream. It is not possible to have temperatures streamed for all possible cities, we have to request the city for which we need the data.
So we need some way to "notify" the second stream source that we need data for this city "pushed" when we encounter it the first time in the first stream.
This would have been easy if this notification could be done from the first stream. The problem is that the second stream is coming to us through a websocket part of which is a control channel through which we have to make the request - so the request HAS to be made from the second stream.
- Check event in the first stream. Read city code x.
- Have we seen this city code? If not, notify the second stream, we need data for city code x.
- Second stream sends message to source for data for x.
- Data starts flowing in for city x, which is used to join downstream.
If notification from the first stream was possible, this would be easy - I could have done it from step 2, so data starts flowing in the second stream. But that is not possible as the request needs to be send on the same websocket connection that feeds the second stream.
I have explored using CoProcessFunction or RichCoMapFunction for this - but it is not clear how this can be done. I have seen some examples of Broadcast State Pattern - but even that does not seem to fit the usecase.
Can someone help me with some pointers on possible solutions?
Solution 1:[1]
So I made it work using the suggestion of the side output stream. Thanks @whatisinthename and @kkrugler for the suggestions.
Still trying to figure out details, but here's a summary
- From the notification stream (stream 1), create a side output stream (stream 1-1).
- Use an extended class (
TempRequester) ofKeyedProcessFunction, to process the side output stream 1-1 and create Stream 2 from it. The KeyedProcessFunction has the websocket connection. - In the
openmethod of theKeyedProcessFunctioncreate the connection to websocket (handshaking etc.). Have aListStatestate to keep the list of city codes. - In the
processElementfunction ofTempRequester, check the city code coming in from side output stream 1-1. If present in ListState, do nothing. Else, send a message through websocket control channel and request city data and add the code toListState. Create a process timer (this is one time) to fire after 500 milliseconds or so. The websocket server writes the temp data very frequently and that is saved in a queue. - In the
onTimermethod, check the queue, read the data and push out (out.collect...). Create a timer again. So essentially, once the first city code gets in, we create a timer that runs every 500 milliseconds and dumps the records received out into the second stream.
Now the first and second streams can be joined downstream (I used the table API).
Not sure if this is the most elegant solution, but it worked. Thanks for the suggestions.
Here's the approximate main code:
DataStream<Event> notificationStream =
env.addSource(this.notificationSource)
.returns(TypeInformation.of(Event.class));
notificationStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
final OutputTag<String> outputTag = new OutputTag<String>("cities-seen"){};
SingleOutputStreamOperator<Event> mainDataStream = notificationStream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(
Event value,
Context ctx,
Collector<Event> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag, event.cityCode);
}
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
DataStream<TemperatureData> temperatureStream = sideOutputStream
.keyBy(value -> value)
.process(new TempRequester());
temperatureStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
// set up the Java Table API and rest of SQL based joins ...
And the approximate code for TempRequester (ProcessFunction):
public static class TempRequester extends KeyedProcessFunction<String, String, TemperatureData> {
private ListState<String> allCities;
private volatile boolean running = true;
//This is the queue for requesting city codes
private BlockingQueue<String> messagesToSend = new ArrayBlockingQueue<>(100);
//This is the queue for receiving temperature data
private ConcurrentLinkedQueue<TemperatureData> messages = new ConcurrentLinkedQueue<TemperatureData>();
private static final int TIMEOUT = 500;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
allCities = getRuntimeContext().getListState(new ListStateDescriptor<>("List of cities seen", String.class));
... rest of websocket client setup code ...
}
@Override
public void close() throws Exception {
running = false;
super.close();
}
private boolean initialized = false;
@Override
public void processElement(String cityCode, Context ctx, Collector<TemperatureData> collector) throws Exception {
boolean citycodeFound = StreamSupport.stream(allCities.get().spliterator(), false)
.anyMatch(s -> cityCode.equals(s));
if (!citycodeFound) {
allCities.add(cityCode);
messagesToSend.put(.. add city code ..);
if (!initialized) {
ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+ TIMEOUT);
initialized = true;
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<TemperatureData> out) throws Exception {
TemperatureData p;
while ((p = messages.poll()) != null) {
out.collect(p);
}
ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() + TIMEOUT);
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Arnab Gupta |
