'Flink - How to resolve backpressure in Filter Function?

With flink v1.13.2 using rocksdb, I am measuring taskpressure with the following command in the grafana: sum by (task_name)(flink_taskmanager_job_task_backPressuredTimeMsPerSecond{job="..."})

Some filter functions suffers(ProcessFilter_A, ProcessFilter_B, C, D) from the backpressure. But the following process functions work well (there are no backpressure -both process and mongodb sink-). And I have no idea how to resolve backpressure in the filter functions

What could be the possible reasons?

And also i wonder that because i am using rocksdb KafkaObject in the aggregate function will be stored in the rocksdb?

Here is the my filter functions (suffered from backpressure):

private void myStream(DataStream<KafkaObject> kafkaSource)
    {
       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )
                .name("ProcessFilter_A")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(2), Time.minutes(30)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup A"))
                .name("ProcessA")
                .addSink(new MongoDbSink())
                ;

       kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )            
                .name("ProcessFilter_B")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.hours(1), Time.minutes(20)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup B"))
                .name("ProcessB")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )           
                .name("ProcessFilter_C")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(30), Time.minutes(10)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup C"))
                .name("ProcessC")
                .addSink(new MongoDbSink())
                ;

        kafkaSource
                .filter(log -> log.contains("x") || log.contains("y") )              
                .name("ProcessFilter_D")
                .keyBy(KafkaObject::getInstanceA)
                .window(TumblingProcessingTimeWindows.of(Time.minutes(15)))
                .aggregate(new MyAggregateCounter(), new MyProcessFunction("setup D"))
                .name("ProcessD")
                .addSink(new MongoDbSink())
                ;
    }

here is the aggregate function:

public class MyAggregateCounter implements AggregateFunction<KafkaObject, Tuple3<Long, Long, Long>, Tuple3<Long,Long,Long>>
{
    @Override
    public Tuple3<Long, Long, Long> createAccumulator()
    {
        return Tuple3.of(0L, 0L, 0L);
    }

    @Override
    public Tuple3<Long, Long, Long> add(KafkaObject value, Tuple3<Long, Long, Long> accumulator)
    {
        if (...)
        {
            accumulator.f0 += 1;
        }
        else if (...)
        {
            accumulator.f1 += 1;
        }
        else if (...)
        {
            accumulator.f2 += 1;
        }
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> getResult(Tuple3<Long, Long, Long> accumulator)
    {
        return accumulator;
    }

    @Override
    public Tuple3<Long, Long, Long> merge(Tuple3<Long, Long, Long> a, Tuple3<Long, Long, Long> b)
    {
        return Tuple3.of(a.f0 + b.f0, a.f1 + b.f1, a.f2 + b.f2);
    }
}

UPDATE: David was right, problem was the mongodb. Even I can not relate the filter backpressure with mongodb(because there is no backpressure mongodb sink itself), after removing the mongoSink with NoSink implementation everything was okey.

In open method of sink function, i am creating N com.mongodb.MongoClient instances where N is the parallelism. Now i changed N instance to one instance per taskmanager (using singletion pattern)

Now even it seems okey, i believe Flink needs pre-defined mongoSink(like Kafka) as well. And also Flink needs some other indicators to find-out the real reason.(Because I would never find that problem was the MongoDB)



Solution 1:[1]

The MongoDB sink is the most likely cause of the backpressure you are observing in the filter functions. You can verify this by replacing the sink with a discarding sink, and checking to see if that eliminates the backpressure.

See https://stackoverflow.com/a/54386180/2000823 for an example.

As for reducing the backpressure, you could use Flink's built-in backpressure monitoring and flame graphs to identify the bottleneck(s).

If you do that you may find that MongoDB and/or Flink needs to be scaled up. You may also find that your job is spending most of its time performing serialization/deserialization -- optimizing serialization is one of the best ways to increase throughput.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1