Ada*_*myd 1 scala processbuilder akka akka-stream
我正在构建一个具有以下流程的应用程序:
\nffmpeg在最后,但对于这个简单的可重现用例,它只是cat让数据通过它传递)所以我正在执行以下操作:
\nProcessBuilder结合使用Flow.fromSinkAndSource来构建此外部流程执行的流程完整代码示例:
\nimport akka.actor.ActorSystem\nimport akka.stream.scaladsl.GraphDSL.Implicits._\nimport akka.stream.scaladsl._\nimport akka.stream.ClosedShape\nimport akka.util.ByteString\n\nimport java.io.{BufferedInputStream, BufferedOutputStream}\nimport java.nio.file.Paths\nimport scala.concurrent.duration.Duration\nimport scala.concurrent.{Await, ExecutionContext, Future}\n\nobject MyApp extends App {\n\n // When this is changed to something above 15, the graph just stops\n val PROCESSES_COUNT = Integer.parseInt(args(0))\n\n println(s"Running with ${PROCESSES_COUNT} processes...")\n\n implicit val system = ActorSystem("MyApp")\n implicit val globalContext: ExecutionContext = ExecutionContext.global\n\n def executeCmdOnStream(cmd: String): Flow[ByteString, ByteString, _] = {\n val convertProcess = new ProcessBuilder(cmd).start\n val pipeIn = new BufferedOutputStream(convertProcess.getOutputStream)\n val pipeOut = new BufferedInputStream(convertProcess.getInputStream)\n Flow\n .fromSinkAndSource(StreamConverters.fromOutputStream(() \xe2\x87\x92 pipeIn), StreamConverters.fromInputStream(() \xe2\x87\x92 pipeOut))\n }\n\n val source = Source(1 to 100)\n .map(element => {\n println(s"--emit: ${element}")\n ByteString(element)\n })\n\n val sinksList = (1 to PROCESSES_COUNT).map(i => {\n Flow[ByteString]\n .via(executeCmdOnStream("cat"))\n .toMat(FileIO.toPath(Paths.get(s"process-$i.txt")))(Keep.right)\n })\n\n val graph = GraphDSL.create(sinksList) { implicit builder => sinks =>\n\n val broadcast = builder.add(Broadcast[ByteString](sinks.size))\n source ~> broadcast.in\n for (i <- broadcast.outlets.indices) {\n broadcast.out(i) ~> sinks(i)\n }\n ClosedShape\n }\n\n Await.result(Future.sequence(RunnableGraph.fromGraph(graph).run()), Duration.Inf)\n\n}\nRun Code Online (Sandbox Code Playgroud)\n使用以下命令运行它:
\nsbt "run PROCESSES_COUNT"\nRun Code Online (Sandbox Code Playgroud)\nIE
\nsbt "run 15"\nRun Code Online (Sandbox Code Playgroud)\n这一切都运行得很好,直到我增加“外部进程”的数量(代码中的 PROCESSES_COUNT)。当它为15或更少时,一切顺利,但当它为16 或更多时,会发生以下情况:
\ncat系统中启动了进程(全部 16 个进程)cat当我手动杀死系统中的这些进程之一时,某些东西会被释放并继续处理(当然,在结果中,一个文件是空的,因为我杀死了它的处理命令)我检查过这肯定是由外部执行引起的(而不是 Akka Broadcast 本身的限制)。
\n我录制了一段视频,展示了这两种情况(首先,15 个项目工作正常,然后 16 个项目挂起并通过终止一个进程而释放)- 视频链接
\n代码和视频都在这个仓库中
\n如果有任何帮助或建议,我将不胜感激,以便在哪里寻找此问题的解决方案。
\nIt is an interesting problem and it looks like that the stream is dead-locking. The increase of threads may be fixing the symptom but not the underlying problem.
The problem is following code
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn),
StreamConverters.fromInputStream(() => pipeOut)
)
Run Code Online (Sandbox Code Playgroud)
Both fromInputStream and fromOutputStream will be using the same default-blocking-io-dispatcher as you correctly noticed. The reason for using a dedicated thread pool is that both perform Java API calls that are blocking the running thread.
Here is a part of a thread stack trace of fromInputStream that shows where blocking is happening.
at java.io.FileInputStream.readBytes(java.base@11.0.13/Native Method)
at java.io.FileInputStream.read(java.base@11.0.13/FileInputStream.java:279)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.lang.ProcessImpl$ProcessPipeInputStream)
at java.io.BufferedInputStream.read1(java.base@11.0.13/BufferedInputStream.java:290)
at java.io.BufferedInputStream.read(java.base@11.0.13/BufferedInputStream.java:351)
- locked <merged>(a java.io.BufferedInputStream)
at java.io.FilterInputStream.read(java.base@11.0.13/FilterInputStream.java:107)
at akka.stream.impl.io.InputStreamSource$$anon$1.onPull(InputStreamSource.scala:63)
Run Code Online (Sandbox Code Playgroud)
Now, you're running 16 simultaneous Sinks that are connected to a single Source. To support back-pressure, a Source will only produce an element when all Sinks send a pull command.
FileInputStream.readBytes接下来发生的情况是,您同时有 16 个方法调用,它们立即阻塞default-blocking-io-dispatcher. 并且没有留下任何线程可以fromOutputStream写入任何数据Source或执行任何类型的工作。因此,你陷入了僵局。
如果增加池中的线程,则可以解决该问题。但这只是消除了症状。
正确的解决方案是在两个独立的线程池中运行fromOutputStream和。fromInputStream以下是您可以如何做到这一点。
Flow
.fromSinkAndSource(
StreamConverters.fromOutputStream(() => pipeIn),
StreamConverters.fromInputStream(() => pipeOut)
)
Run Code Online (Sandbox Code Playgroud)
具有以下配置
blocking-1 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
blocking-2 {
type = "Dispatcher"
executor = "thread-pool-executor"
throughput = 1
thread-pool-executor {
fixed-pool-size = 2
}
}
Run Code Online (Sandbox Code Playgroud)
因为他们不再共享池,所以都fromOutputStream可以fromInputStream独立执行任务。
另请注意,我只是2为每个池分配了线程,以表明它与线程计数无关,而是与池分离有关。
我希望这有助于更好地理解 akka 流。
| 归档时间: |
|
| 查看次数: |
440 次 |
| 最近记录: |