'Flink Complex Event Processing not matching any pattern

It's been a while I've worked with Flink, but I cannot seem to get a simple CEP program to working. For reference, I connected to a local port and typed numbers. Reading the docs and trying many samples (most of them old) I've still not found an answer.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStream<Integer> source = env.socketTextStream("localhost", 3030, "\n")
        .map((MapFunction<String, Integer>) s -> Integer.parseInt(s) * 2);

Pattern<Integer, ?> alarmPattern = Pattern.<Integer>begin("first")
        .where(new SimpleCondition<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
                System.out.println("Is it match? " + value);
                return value < 30;
            }
        });

PatternStream<Integer> pattern = CEP.pattern(source, alarmPattern);
pattern.process(new PatternProcessFunction<Integer, Object>() {
    @Override
    public void processMatch(Map<String, List<Integer>> map, Context
            context, Collector<Object> collector) throws Exception {

        System.out.println("Detected Pattern: " + map);
    }
});

source.print();
env.execute();

Above code connects to localhost 3030, and parses strings and multiplies them by 2. I verified that one is working by printing them. But neither pattern is processing, or the simple condition is being invoked at all. Looking at the execution plan (removing print), I see there is indeed a CEP operator:

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Socket Stream",
    "pact" : "Data Source",
    "contents" : "Source: Socket Stream",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "GlobalCepOperator",
    "pact" : "Operator",
    "contents" : "GlobalCepOperator",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  } ]
}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source