'I want to replace dataset api with datstream api

My flink version is 1.13.

dataSet.first(limit).print();

How to replace it with datastream api?



Solution 1:[1]

If you can't use Flink SQL, then you could write your own limit operator, something like (warning, untested!):

public class LimitFilter<T> extends RichFilterFunction<T> {

    private int _limit;
    
    private transient int _remainingRecords;
    
    public LimitFilter(int limit) {
        _limit = limit;
    }
    
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // Calc number of records to return from this subtask
        int mySubtask = getRuntimeContext().getIndexOfThisSubtask();
        int remainingItems = _limit;
        int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
        for (int i = 0; i < parallelism; i++) {
            int remainingGroups = parallelism - i;
            int itemsInGroup = remainingItems / remainingGroups;
            if (i == mySubtask) {
                _remainingRecords = itemsInGroup;
                break;
            }
            
            remainingItems -= itemsInGroup;
        }
    }
    
    @Override
    public boolean filter(T value) throws Exception {
        if (_remainingRecords <= 0) {
            return false;
        }
        
        _remainingRecords--;
        return true;
    }

}

This will only work well with parallelism > 1 if you have some reasonably random distribution of data between the operators, e.g. use a rebalance() right before the .filter(new LimitFilter(limit)) operator.

Solution 2:[2]

In the case of DataSet the data is finite while for a DataStream the number of elements can be unbounded. I think you know it already so the notion of .first(n) elements is not the same when you reading data arriving continuously sometimes out of order.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 kkrugler
Solution 2 Niko