'Flink Complex Event Processing not matching any pattern
It's been a while I've worked with Flink, but I cannot seem to get a simple CEP program to working. For reference, I connected to a local port and typed numbers. Reading the docs and trying many samples (most of them old) I've still not found an answer.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> source = env.socketTextStream("localhost", 3030, "\n")
.map((MapFunction<String, Integer>) s -> Integer.parseInt(s) * 2);
Pattern<Integer, ?> alarmPattern = Pattern.<Integer>begin("first")
.where(new SimpleCondition<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
System.out.println("Is it match? " + value);
return value < 30;
}
});
PatternStream<Integer> pattern = CEP.pattern(source, alarmPattern);
pattern.process(new PatternProcessFunction<Integer, Object>() {
@Override
public void processMatch(Map<String, List<Integer>> map, Context
context, Collector<Object> collector) throws Exception {
System.out.println("Detected Pattern: " + map);
}
});
source.print();
env.execute();
Above code connects to localhost 3030, and parses strings and multiplies them by 2. I verified that one is working by printing them. But neither pattern is processing, or the simple condition is being invoked at all. Looking at the execution plan (removing print), I see there is indeed a CEP operator:
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Socket Stream",
"pact" : "Data Source",
"contents" : "Source: Socket Stream",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "GlobalCepOperator",
"pact" : "Operator",
"contents" : "GlobalCepOperator",
"parallelism" : 1,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "HASH",
"side" : "second"
} ]
} ]
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
