'Flink - Need way to notify one stream from another

I have an Apache flink usecase that works as follows:

I have data events coming in through first stream. Part of each event is a foreign key for which I expect data from the second stream. E.g.: I am getting data for major cities in the first stream which has a city-code and I need the average temperature over time for this city code streamed through the second stream. It is not possible to have temperatures streamed for all possible cities, we have to request the city for which we need the data.

So we need some way to "notify" the second stream source that we need data for this city "pushed" when we encounter it the first time in the first stream.

This would have been easy if this notification could be done from the first stream. The problem is that the second stream is coming to us through a websocket part of which is a control channel through which we have to make the request - so the request HAS to be made from the second stream.

  1. Check event in the first stream. Read city code x.
  2. Have we seen this city code? If not, notify the second stream, we need data for city code x.
  3. Second stream sends message to source for data for x.
  4. Data starts flowing in for city x, which is used to join downstream.

If notification from the first stream was possible, this would be easy - I could have done it from step 2, so data starts flowing in the second stream. But that is not possible as the request needs to be send on the same websocket connection that feeds the second stream.

I have explored using CoProcessFunction or RichCoMapFunction for this - but it is not clear how this can be done. I have seen some examples of Broadcast State Pattern - but even that does not seem to fit the usecase.

Can someone help me with some pointers on possible solutions?



Solution 1:[1]

So I made it work using the suggestion of the side output stream. Thanks @whatisinthename and @kkrugler for the suggestions.

Still trying to figure out details, but here's a summary

  1. From the notification stream (stream 1), create a side output stream (stream 1-1).
  2. Use an extended class (TempRequester) of KeyedProcessFunction, to process the side output stream 1-1 and create Stream 2 from it. The KeyedProcessFunction has the websocket connection.
  3. In the open method of the KeyedProcessFunction create the connection to websocket (handshaking etc.). Have a ListState state to keep the list of city codes.
  4. In the processElement function of TempRequester, check the city code coming in from side output stream 1-1. If present in ListState, do nothing. Else, send a message through websocket control channel and request city data and add the code to ListState. Create a process timer (this is one time) to fire after 500 milliseconds or so. The websocket server writes the temp data very frequently and that is saved in a queue.
  5. In the onTimer method, check the queue, read the data and push out (out.collect...). Create a timer again. So essentially, once the first city code gets in, we create a timer that runs every 500 milliseconds and dumps the records received out into the second stream.

Now the first and second streams can be joined downstream (I used the table API).

Not sure if this is the most elegant solution, but it worked. Thanks for the suggestions.

Here's the approximate main code:

DataStream<Event> notificationStream =
          env.addSource(this.notificationSource)
          .returns(TypeInformation.of(Event.class));

        notificationStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

final OutputTag<String> outputTag = new OutputTag<String>("cities-seen"){};

SingleOutputStreamOperator<Event> mainDataStream = notificationStream.process(new ProcessFunction<Event, Event>() {

            @Override
            public void processElement(
                    Event value,
                    Context ctx,
                    Collector<Event> out) throws Exception {
                // emit data to regular output
                out.collect(value);

                // emit data to side output
                ctx.output(outputTag, event.cityCode);
            }
        });

DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);

DataStream<TemperatureData> temperatureStream = sideOutputStream
                .keyBy(value -> value)
                .process(new TempRequester());

        temperatureStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());

        // set up the Java Table API and rest of SQL based joins ...

And the approximate code for TempRequester (ProcessFunction):

public static class TempRequester extends KeyedProcessFunction<String, String, TemperatureData> {

        private ListState<String> allCities;
        private volatile boolean running = true;
        //This is the queue for requesting city codes
        private BlockingQueue<String> messagesToSend = new ArrayBlockingQueue<>(100);
        //This is the queue for receiving temperature data
        private ConcurrentLinkedQueue<TemperatureData> messages = new ConcurrentLinkedQueue<TemperatureData>();

        private static final int TIMEOUT = 500;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            allCities = getRuntimeContext().getListState(new ListStateDescriptor<>("List of cities seen", String.class));
            ... rest of websocket client setup code ...
        }

        @Override
        public void close() throws Exception {
            running = false;
            super.close();
        }

        private boolean initialized = false;

        @Override
        public void processElement(String cityCode, Context ctx, Collector<TemperatureData> collector) throws Exception {
            boolean citycodeFound = StreamSupport.stream(allCities.get().spliterator(), false)
                    .anyMatch(s -> cityCode.equals(s));
            if (!citycodeFound) {
                allCities.add(cityCode);
                messagesToSend.put(.. add city code ..);
                if (!initialized) {
                 ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+ TIMEOUT);
                    initialized = true;
                }
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<TemperatureData> out) throws Exception {
            TemperatureData p;
            while ((p = messages.poll()) != null) {
                out.collect(p);
            }
            ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() + TIMEOUT);
        }
    }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Arnab Gupta