'Backpressuring on async Flow in Akka Streams
I have this flow component in Akka streams
object ExecutionFlow {
def apply(
execute: In => Future[Out],
parallelism: Int)(implicit ec: ExecutionContext): Flow[In, Out, NotUsed] =
Flow[In]
.async // make the streams run asynchronously. essential for throughput
.mapAsyncUnordered(parallelism){
case (in) => execute(in)
}
}
Here In and Out are two data types and execute is a heavy weight expensive operation. There is a thread pool executor on which these operations run. Its set up like this
private val threadPool = new ThreadPoolExecutor(10, 10, 300, TimeUnit.SECONDS, new SynchronousQueue[Runnable]())
Here corepool size is 10, maxPool size is 10, 300 is the ttl in seconds. Note that this is akka explicitly handing over execution to another thread pool.
Now when I use this flow, as soon as messages come into the Execution flow its delegated to this thread pool executor and and a new one comes in, the flow lets messages in and the threadpool suddenly spikes up in its active thread count. I lowered the parallelism value down to 1 and removed .async too but the problem is still present.
When I do this however
object ExecutionFlow {
def apply(
execute: In => Future[Out], timeout: FiniteDuration): Flow[In, Out, NotUsed] =
Flow[In]
.map){ case (in) => Await.result(execute(in), timeout) }
}
This problem goes away. I presume the nature of the blocking somehow implicitly slows down the processing here.
I wanted to know what would be a good strategy to effectively back pressure here so that I do not have to deal with active thread usage spikes.
I am looking at https://doc.akka.io/docs/akka/current/stream/operators/index.html#backpressure-aware-operators to see if I can come up with some scheme but I am at a loss.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
