'Akka stream hangs when starting more than 15 external processes using ProcessBuilder

I'm building an app that has the following flow:

  1. There is a source of items to process
  2. Each item should be processed by external command (it'll be ffmpeg in the end but for this simple reproducible use case it is just cat to have data be passed through it)
  3. In the end, the output of such external command is saved somewhere (again, for the sake of this example it just saves it to a local text file)

So I'm doing the following operations:

  1. Prepare a source with items
  2. Make an Akka graph that uses Broadcast to fan-out the source items into individual flows
  3. Individual flows uses ProcessBuilder in conjunction with Flow.fromSinkAndSource to build flow out of this external process execution
  4. End the individual flows with a sink that saves the data to a file.

Complete code example:

import akka.actor.ActorSystem
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl._
import akka.stream.ClosedShape
import akka.util.ByteString

import java.io.{BufferedInputStream, BufferedOutputStream}
import java.nio.file.Paths
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}

object MyApp extends App {

  // When this is changed to something above 15, the graph just stops
  val PROCESSES_COUNT = Integer.parseInt(args(0))

  println(s"Running with ${PROCESSES_COUNT} processes...")

  implicit val system                          = ActorSystem("MyApp")
  implicit val globalContext: ExecutionContext = ExecutionContext.global

  def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {
    val convertProcess = new ProcessBuilder(cmd).start
    val pipeIn         = new BufferedOutputStream(convertProcess.getOutputStream)
    val pipeOut        = new BufferedInputStream(convertProcess.getInputStream)
    Flow
      .fromSinkAndSource(StreamConverters.fromOutputStream(() ⇒ pipeIn), StreamConverters.fromInputStream(() ⇒ pipeOut))
  }

  val source = Source(1 to 100)
    .map(element => {
      println(s"--emit: ${element}")
      ByteString(element)
    })

  val sinksList = (1 to PROCESSES_COUNT).map(i => {
    Flow[ByteString]
      .via(executeCmdOnStream("cat"))
      .toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)
  })

  val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>

    val broadcast = builder.add(Broadcast[ByteString](sinks.size))
    source ~> broadcast.in
    for (i <- broadcast.outlets.indices) {
      broadcast.out(i) ~> sinks(i)
    }
    ClosedShape
  }

  Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)

}

Run this using following command:

sbt "run PROCESSES_COUNT"

i.e.

sbt "run 15"

This all works quite well until I raise the amount of "external processes" (PROCESSES_COUNT in the code). When it's 15 or less, all goes well but when it's 16 or more then the following things happen:

  1. Whole execution just hangs after emitting the first 16 items (this amount of 16 items is Akka's default buffer size AFAIK)
  2. I can see that cat processes are started in the system (all 16 of them)
  3. When I manually kill one of these cat processes in the system, something frees up and processing continues (of course in the result, one file is empty because I killed its processing command)

I checked that this is caused by the external execution for sure (not i.e. limit of Akka Broadcast itself).

I recorded a video showing these two situations (first, 15 items working fine and then 16 items hanging and freed up by killing one process) - link to the video

Both the code and video are in this repo

I'd appreciate any help or suggestions where to look solution for this one.



Solution 1:[1]

Turns out this was limit on Akka configuration level of blocking IO dispatchers:

enter image description here

So changing that value to something bigger than the amount of streams fixed the issue:

akka.actor.default-blocking-io-dispatcher.thread-pool-executor.fixed-pool-size = 50

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Adam Szmyd