tts*_*ras 0 multithreading scala multiprocessing
在Python中,我使用了一个名为的库futures,它允许我以简洁明了的方式对N个工作进程池进行处理:
schedulerQ = []
for ... in ...:
workParam = ... # arguments for call to processingFunction(workParam)
schedulerQ.append(workParam)
with futures.ProcessPoolExecutor(max_workers=5) as executor: # 5 CPUs
for retValue in executor.map(processingFunction, schedulerQ):
print "Received result", retValue
Run Code Online (Sandbox Code Playgroud)
(这processingFunction是CPU绑定的,所以这里的异步机器没有意义 - 这是关于简单的旧算术计算)
我现在正在寻找在Scala中做同样事情的最接近的方法.请注意,在Python中,为了避免GIL问题,我使用了进程(因此使用了ProcessPoolExecutor代替ThreadPoolExecutor) - 并且库自动地将workParam参数封送到每个执行的流程实例processingFunction(workParam)- 并且它将结果封送回主进程,执行者的map循环消费.
这适用于Scala和JVM吗?我的processingFunction原则上也可以从线程执行(根本没有全局状态) - 但是我有兴趣看到多处理和多线程的解决方案.
问题的关键部分是JVM世界中是否有任何与futures上面看到的Python一样明确的API ...我认为这是我见过的最好的SMP API之一 - 准备一个列表使用所有调用的函数参数,然后只有两行:创建poolExecutor和map处理函数,一旦工作人员生成结果就返回结果.一旦第一次调用processingFunction返回结果,结果就会立即开始,直到它们全部完成为止 - 此时for循环结束.
您使用Scala中的并行集合的样板少了.
myParameters.par.map(x => f(x))
Run Code Online (Sandbox Code Playgroud)
如果你想要默认的线程数(与核心数相同),你会发挥作用.
如果你坚持设定工人数量,你可以这样:
import scala.collection.parallel._
import scala.concurrent.forkjoin._
val temp = myParameters.par
temp.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
temp.map(x => f(x))
Run Code Online (Sandbox Code Playgroud)
返回时间的确切细节是不同的,但您可以根据需要放置尽可能多的机器f(x)(即计算并对结果执行某些操作),这样可以满足您的需求.
一般来说,仅仅将结果显示为已完成是不够的; 然后你需要处理它们,可能分叉它们,收集它们等等.如果你想这样做一般,Akka Streams(从这里跟随链接)接近1.0,将有助于生成并行处理的复杂图形.
| 归档时间: |
|
| 查看次数: |
160 次 |
| 最近记录: |