Ben*_*man 2 parallel-processing concurrency scala fork-join actor
我想知道是否有一种方法可以在scala中的另一个线程上执行非常简单的任务,而这些任务没有很多开销?
基本上我想创建一个可以处理执行任意数量任务的全局"执行器".然后我可以使用执行程序来构建其他构造.
此外,如果客户端不必考虑阻塞或非阻塞因素,那将是很好的.
我知道scala actor库是建立在Doug Lea FJ之上的,而且他们在有限的程度上支持我想要完成的事情.但是根据我的理解,我将不得不预先分配一个'Actor Pool'来完成.
我想避免为此创建一个全局线程池,因为据我所知,它在细粒度并行性方面并不是那么好.
这是一个简单的例子:
import concurrent.SyncVar
object SimpleExecutor {
import actors.Actor._
def exec[A](task: => A) : SyncVar[A] = {
//what goes here?
//This is what I currently have
val x = new concurrent.SyncVar[A]
//The overhead of making the actor appears to be a killer
actor {
x.set(task)
}
x
}
//Not really sure what to stick here
def execBlocker[A](task: => A) : SyncVar[A] = exec(task)
}
Run Code Online (Sandbox Code Playgroud)
现在使用exec的一个例子:
object Examples {
//Benchmarks a task
def benchmark(blk : => Unit) = {
val start = System.nanoTime
blk
System.nanoTime - start
}
//Benchmarks and compares 2 tasks
def cmp(a: => Any, b: => Any) = {
val at = benchmark(a)
val bt = benchmark(b)
println(at + " " + bt + " " +at.toDouble / bt)
}
//Simple example for simple non blocking comparison
import SimpleExecutor._
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
//Simple example for the blocking performance
import Thread.sleep
def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}
Run Code Online (Sandbox Code Playgroud)
最后运行示例(可能想要做几次,以便HotSpot可以预热):
import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
Run Code Online (Sandbox Code Playgroud)
这就是Futures为此而做的.只是import scala.actors.Futures._,future用来创建新的期货,awaitAll等待结果一段时间的方法,apply或respond阻止直到收到结果,isSet看看它是否准备好等等.
您也不需要创建线程池.或者,至少,通常不会.为什么你认为你呢?
编辑
你无法获得并行化的功能,就像整数加法一样简单,因为它甚至比函数调用更快.并发只会通过避免时间丢失来阻止i/o以及使用多个CPU内核并行执行任务来提高性能.在后一种情况下,任务的计算成本必须足以抵消划分工作量和合并结果的成本.
寻求并发的另一个原因是提高应用程序的响应能力.这并没有使它更快,这使得它对用户的响应更快,并且这样做的一种方法是将相对快速的操作卸载到另一个线程,以便处理用户看到或做的事情的线程可以更快.但我离题了.
您的代码存在严重问题:
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
Run Code Online (Sandbox Code Playgroud)
或者,翻译成期货,
def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
Run Code Online (Sandbox Code Playgroud)
您可能认为paraAdd在paralallel中执行任务,但事实并非如此,因为Range它具有非严格的实现map(这取决于Scala 2.7;从Scala 2.8.0开始,Range是严格的).您可以查看其他Scala问题.这是怎么回事:
0直到创建hifuture(i+5)在调用时返回的函数.i => future(i+5)),对元素进行求值(foreach严格),然后在其apply上调用函数.所以,因为future是不是在步骤2中调用,但只在第3步,你会等待每个future做下一个之前完成.你可以修复它:
def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)
Run Code Online (Sandbox Code Playgroud)
这将为您提供更好的性能,但永远不会像简单的立即添加那么好.另一方面,假设你这样做:
def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) =
(0 until n).force map (_ => future(f)) foreach (_.apply)
Run Code Online (Sandbox Code Playgroud)
然后比较:
cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))
Run Code Online (Sandbox Code Playgroud)
您可能会开始看到增益(这取决于内核数量和处理器速度).
| 归档时间: |
|
| 查看次数: |
1897 次 |
| 最近记录: |