'Cannot infer type-variable(s) R in Flink ProcessWindowFunction
I am having trouble with resolving this error in Flink (Version 1.11.0):
java: no suitable method found for process(com.xyz.myPackage.operators.windowed.ComputeFeatures)
method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>) is not applicable
(cannot infer type-variable(s) R
(argument mismatch; com.xyz.myPackage.operators.windowed.ComputeFeatures cannot be converted to org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>))
method org.apache.flink.streaming.api.datastream.WindowedStream.<R>process(org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction<com.xyz.myPackage.entities.StreamElement,R,java.lang.Long,org.apache.flink.streaming.api.windowing.windows.TimeWindow>,org.apache.flink.api.common.typeinfo.TypeInformation<R>) is not applicable
(cannot infer type-variable(s) R
(actual and formal argument lists differ in length))
This is how I create a keyed windowed stream:
timestampedStreamElementDataStream
.keyBy(StreamElement::getId)
.window(SlidingEventTimeWindows.of(Time.seconds(600),Time.seconds(60)))
.process(new ComputeFeatures());
And here is how my ComputeFeatures function looks like:
public class ComputeFeatures extends ProcessWindowFunction<
StreamElement,
StreamElement,
Long,
TimeWindow> {
@Override
public void process(Long key,
Context context,
Iterable<StreamElement> elements,
Collector<StreamElement> out) throws Exception {
System.out.println("In windowed function");
}
}
The StreamElement::getId returns a Long so everything regarding types should be correct, but it seems that Flink still has trouble inferring a type. I am looking for ideas how to solve this.
NOTE: This issue seems related but it didn't fit my problem: LINK
EDIT 1:
As suggested by David I tried autogenerating the overridden process function with IntelliJ, but the issue still remains the same. The autogenerated code looks like this in case of specifying types:
public class ComputeFeatures extends ProcessWindowFunction<StreamElement,StreamElement,Long,TimeWindow> {
@Override
public void process(Long aLong,
ProcessWindowFunction<StreamElement, StreamElement, Long, TimeWindow>.Context context,
Iterable<StreamElement> elements,
Collector<StreamElement> out) throws Exception {
}
And like this when I omit type specification:
public class ComputeFeatures extends ProcessWindowFunction {
@Override
public void process(Object o, Context context, Iterable elements, Collector out) throws Exception {
System.out.println("In windowed function");
}
EDIT 2:
Maybe relevant: When I hover over new ComputeFeatures() IntelliJ displays this infobox:
Required type:
ProcessWindowFunction
<com.xyz.myPackage.entities.StreamElement,
R,
java.lang.Long,
org.apache.flink.streaming.api.windowing.windows.TimeWindow>
Provided:
ComputeFeatures
reason: no instance(s) of type variable(s) R exist so that ComputeFeatures conforms to ProcessWindowFunction<StreamElement, R, Long, TimeWindow>
Solution 1:[1]
Eh stupid mistake, the code works as it is, the problem was that IntelliJ imported a wrong ProcessWindowFunction (the Scala variant). After changing that everything worked as expected
Solution 2:[2]
I'm not sure what's wrong, but in cases like this I usually debug the type mismatch by having IntelliJ generate the method for me -- so in this case, the overridden process method -- so I can see what it thinks is going on.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Amuoeba |
| Solution 2 | David Anderson |
