Scala - 在多个线程内完成工作的最佳API

tts*_*ras 0 multithreading scala multiprocessing

在Python中,我使用了一个名为的库futures,它允许我以简洁明了的方式对N个工作进程池进行处理:

schedulerQ = []
for ... in ...:
    workParam = ...  # arguments for call to processingFunction(workParam)
    schedulerQ.append(workParam)
with futures.ProcessPoolExecutor(max_workers=5) as executor:  # 5 CPUs
    for retValue in executor.map(processingFunction, schedulerQ):
        print "Received result", retValue
Run Code Online (Sandbox Code Playgroud)

(这processingFunction是CPU绑定的,所以这里的异步机器没有意义 - 这是关于简单的旧算术计算)

我现在正在寻找在Scala中做同样事情的最接近的方法.请注意,在Python中,为了避免GIL问题,我使用了进程(因此使用了ProcessPoolExecutor代替ThreadPoolExecutor) - 并且库自动地将workParam参数封送到每个执行的流程实例processingFunction(workParam)- 并且它将结果封送回主进程,执行者的map循环消费.

这适用于Scala和JVM吗?我的processingFunction原则上也可以从线程执行(根本没有全局状态) - 但是我有兴趣看到多处理和多线程的解决方案.

问题的关键部分是JVM世界中是否有任何与futures上面看到的Python一样明确的API ...我认为这是我见过的最好的SMP API之一 - 准备一个列表使用所有调用的函数参数,然后只有两行:创建poolExecutor和map处理函数,一旦工作人员生成结果就返回结果.一旦第一次调用processingFunction返回结果,结果就会立即开始,直到它们全部完成为止 - 此时for循环结束.

Rex*_*err 6

您使用Scala中的并行集合的样板少了.

myParameters.par.map(x => f(x))
Run Code Online (Sandbox Code Playgroud)

如果你想要默认的线程数(与核心数相同),你会发挥作用.

如果你坚持设定工人数量,你可以这样:

import scala.collection.parallel._
import scala.concurrent.forkjoin._

val temp = myParameters.par
temp.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
temp.map(x => f(x))
Run Code Online (Sandbox Code Playgroud)

返回时间的确切细节是不同的,但您可以根据需要放置尽可能多的机器f(x)(即计算并对结果执行某些操作),这样可以满足您的需求.

一般来说,仅仅将结果显示为已完成是不够的; 然后你需要处理它们,可能分叉它们,收集它们等等.如果你想这样做一般,Akka Streams(从这里跟随链接)接近1.0,将有助于生成并行处理的复杂图形.