使用 ProcessBuilder 启动超过 15 个外部进程时,Akka 流会挂起

Ada*_*myd 1 scala processbuilder akka akka-stream

我正在构建一个具有以下流程的应用程序:

\n
    \n
  1. 有要处理的项目来源
  2. \n
  3. 每个项目都应该由外部命令处理(它会ffmpeg在最后,但对于这个简单的可重现用例,它只是cat让数据通过它传递)
  4. \n
  5. 最后,此类外部命令的输出保存在某处(同样,在本示例中,它只是将其保存到本地文本文件中)
  6. \n
\n

所以我正在执行以下操作:

\n
    \n
  1. 准备一个包含项目的源
  2. \n
  3. 制作一个 Akka 图,使用 Broadcast 将源项目扇出到单独的流中
  4. \n
  5. 各个流程ProcessBuilder结合使用Flow.fromSinkAndSource来构建此外部流程执行的流程
  6. \n
  7. 使用将数据保存到文件的接收器结束各个流。
  8. \n
\n

完整代码示例:

\n
import akka.actor.ActorSystem\nimport akka.stream.scaladsl.GraphDSL.Implicits._\nimport akka.stream.scaladsl._\nimport akka.stream.ClosedShape\nimport akka.util.ByteString\n\nimport java.io.{BufferedInputStream, BufferedOutputStream}\nimport java.nio.file.Paths\nimport scala.concurrent.duration.Duration\nimport scala.concurrent.{Await, ExecutionContext, Future}\n\nobject MyApp extends App {\n\n  // When this is changed to something above 15, the graph just stops\n  val PROCESSES_COUNT = Integer.parseInt(args(0))\n\n  println(s"Running with ${PROCESSES_COUNT} processes...")\n\n  implicit val system                          = ActorSystem("MyApp")\n  implicit val globalContext: ExecutionContext = ExecutionContext.global\n\n  def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {\n    val convertProcess = new ProcessBuilder(cmd).start\n    val pipeIn         = new BufferedOutputStream(convertProcess.getOutputStream)\n    val pipeOut        = new BufferedInputStream(convertProcess.getInputStream)\n    Flow\n      .fromSinkAndSource(StreamConverters.fromOutputStream(() \xe2\x87\x92 pipeIn), StreamConverters.fromInputStream(() \xe2\x87\x92 pipeOut))\n  }\n\n  val source = Source(1 to 100)\n    .map(element => {\n      println(s"--emit: ${element}")\n      ByteString(element)\n    })\n\n  val sinksList = (1 to PROCESSES_COUNT).map(i => {\n    Flow[ByteString]\n      .via(executeCmdOnStream("cat"))\n      .toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)\n  })\n\n  val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>\n\n    val broadcast = builder.add(Broadcast[ByteString](sinks.size))\n    source ~> broadcast.in\n    for (i <- broadcast.outlets.indices) {\n      broadcast.out(i) ~> sinks(i)\n    }\n    ClosedShape\n  }\n\n  Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)\n\n}\n
Run Code Online (Sandbox Code Playgroud)\n

使用以下命令运行它:

\n
sbt "run PROCESSES_COUNT"\n
Run Code Online (Sandbox Code Playgroud)\n

IE

\n
sbt "run 15"\n
Run Code Online (Sandbox Code Playgroud)\n

这一切都运行得很好,直到我增加“外部进程”的数量(代码中的 PROCESSES_COUNT)。当它为15或更少时,一切顺利,但当它为16 或更多时,会发生以下情况:

\n
    \n
  1. 整个执行在发出前 16 个项目后挂起(这 16 个项目的数量是 Akka 的默认缓冲区大小 AFAIK)
  2. \n
  3. 我可以看到cat系统中启动了进程(全部 16 个进程
  4. \n
  5. cat当我手动杀死系统中的这些进程之一时,某些东西会被释放并继续处理(当然,在结果中,一个文件是空的,因为我杀死了它的处理命令)
  6. \n
\n

我检查过这肯定是由外部执行引起的(而不是 Akka Broadcast 本身的限制)。

\n

录制了一段视频,展示了这两种情况(首先,15 个项目工作正常,然后 16 个项目挂起并通过终止一个进程而释放)- 视频链接

\n

代码和视频都在这个仓库中

\n

如果有任何帮助或建议,我将不胜感激,以便在哪里寻找此问题的解决方案。

\n

Iva*_*iuc 5

It is an interesting problem and it looks like that the stream is dead-locking. The increase of threads may be fixing the symptom but not the underlying problem.

The problem is following code

Flow
  .fromSinkAndSource(
    StreamConverters.fromOutputStream(() => pipeIn),
    StreamConverters.fromInputStream(() => pipeOut)
  )
Run Code Online (Sandbox Code Playgroud)

Both fromInputStream and fromOutputStream will be using the same default-blocking-io-dispatcher as you correctly noticed. The reason for using a dedicated thread pool is that both perform Java API calls that are blocking the running thread.

Here is a part of a thread stack trace of fromInputStream that shows where blocking is happening.

at java.io.FileInputStream.readBytes(java.base@11.0.13/Native Method)
at java.io.FileInputStream.read(java.base@11.0.13/FileInputStream.java:279)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.lang.ProcessImpl$ProcessPipeInputStream)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(java.base@11.0.13/FilterInputStream.java:107)
at akka.stream.impl.io.InputStreamSource$$anon$1.onPull(InputStreamSource.scala:63)
Run Code Online (Sandbox Code Playgroud)

Now, you're running 16 simultaneous Sinks that are connected to a single Source. To support back-pressure, a Source will only produce an element when all Sinks send a pull command.

FileInputStream.readBytes接下来发生的情况是,您同时有 16 个方法调用,它们立即阻塞default-blocking-io-dispatcher. 并且没有留下任何线程可以fromOutputStream写入任何数据Source或执行任何类型的工作。因此,你陷入了僵局。

如果增加池中的线程,则可以解决该问题。但这只是消除了症状。

正确的解决方案是在两个独立的线程池中运行fromOutputStream和。fromInputStream以下是您可以如何做到这一点。

Flow
  .fromSinkAndSource(
    StreamConverters.fromOutputStream(() => pipeIn),
    StreamConverters.fromInputStream(() => pipeOut)
  )
Run Code Online (Sandbox Code Playgroud)

具有以下配置

blocking-1 {
  type = "Dispatcher"
  executor = "thread-pool-executor"
  throughput = 1
  thread-pool-executor {
    fixed-pool-size = 2
  }
}

blocking-2 {
  type = "Dispatcher"
  executor = "thread-pool-executor"
  throughput = 1
  thread-pool-executor {
    fixed-pool-size = 2
  }
}
Run Code Online (Sandbox Code Playgroud)

因为他们不再共享池,所以都fromOutputStream可以fromInputStream独立执行任务。

另请注意,我只是2为每个池分配了线程,以表明它与线程计数无关,而是与池分离有关。

我希望这有助于更好地理解 akka 流。