'Backpressuring on async Flow in Akka Streams

I have this flow component in Akka streams

object ExecutionFlow {
    def apply(
         execute: In => Future[Out],
         parallelism: Int)(implicit ec: ExecutionContext): Flow[In, Out, NotUsed] =
      Flow[In]
        .async // make the streams run asynchronously. essential for throughput
        .mapAsyncUnordered(parallelism){
          case (in) => execute(in)
        }
}

Here In and Out are two data types and execute is a heavy weight expensive operation. There is a thread pool executor on which these operations run. Its set up like this

private val threadPool = new ThreadPoolExecutor(10, 10, 300, TimeUnit.SECONDS, new SynchronousQueue[Runnable]())

Here corepool size is 10, maxPool size is 10, 300 is the ttl in seconds. Note that this is akka explicitly handing over execution to another thread pool.

Now when I use this flow, as soon as messages come into the Execution flow its delegated to this thread pool executor and and a new one comes in, the flow lets messages in and the threadpool suddenly spikes up in its active thread count. I lowered the parallelism value down to 1 and removed .async too but the problem is still present.

When I do this however

object ExecutionFlow {
    def apply(
         execute: In => Future[Out], timeout: FiniteDuration): Flow[In, Out, NotUsed] =
      Flow[In]
        .map){ case (in) => Await.result(execute(in), timeout) }
}

This problem goes away. I presume the nature of the blocking somehow implicitly slows down the processing here.

I wanted to know what would be a good strategy to effectively back pressure here so that I do not have to deal with active thread usage spikes.

I am looking at https://doc.akka.io/docs/akka/current/stream/operators/index.html#backpressure-aware-operators to see if I can come up with some scheme but I am at a loss.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source