'I want to replace dataset api with datstream api
My flink version is 1.13.
dataSet.first(limit).print();
How to replace it with datastream api?
Solution 1:[1]
If you can't use Flink SQL, then you could write your own limit operator, something like (warning, untested!):
public class LimitFilter<T> extends RichFilterFunction<T> {
private int _limit;
private transient int _remainingRecords;
public LimitFilter(int limit) {
_limit = limit;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Calc number of records to return from this subtask
int mySubtask = getRuntimeContext().getIndexOfThisSubtask();
int remainingItems = _limit;
int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
for (int i = 0; i < parallelism; i++) {
int remainingGroups = parallelism - i;
int itemsInGroup = remainingItems / remainingGroups;
if (i == mySubtask) {
_remainingRecords = itemsInGroup;
break;
}
remainingItems -= itemsInGroup;
}
}
@Override
public boolean filter(T value) throws Exception {
if (_remainingRecords <= 0) {
return false;
}
_remainingRecords--;
return true;
}
}
This will only work well with parallelism > 1 if you have some reasonably random distribution of data between the operators, e.g. use a rebalance() right before the .filter(new LimitFilter(limit)) operator.
Solution 2:[2]
In the case of DataSet the data is finite while for a DataStream the number of elements can be unbounded. I think you know it already so the notion of .first(n) elements is not the same when you reading data arriving continuously sometimes out of order.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | kkrugler |
| Solution 2 | Niko |
