oxb*_*kes 8 concurrency scala actor
正如我自己对自己的问题的回答一样,我遇到了处理大量事件的情况.每个事件都以完全相同的方式处理,甚至可以独立于所有其他事件处理每个事件.
我的程序利用了Scala并发框架,并且涉及的许多过程都被建模为Actors.由于ActorS程序的消息顺序,他们不是非常适合于这个特定的问题(即使我的其他演员在执行哪些操作是连续的).因为我希望Scala"控制"所有线程创建(我假设它首先有一个并发系统),我似乎有两个选择:
Actor通过其他机制同时处理它们我原以为#1否定了使用actors子系统的要点:我应该创建多少个处理器actor?是一个明显的问题.这些东西据说对我来说是隐藏的,并由子系统解决.
我的回答是做以下事情:
val eventProcessor = actor {
loop {
react {
case MyEvent(x) =>
//I want to be able to handle multiple events at the same time
//create a new actor to handle it
actor {
//processing code here
process(x)
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
有更好的方法吗?这是不正确的?
编辑:一个可能更好的方法是:
val eventProcessor = actor {
loop {
react {
case MyEvent(x) =>
//Pass processing to the underlying ForkJoin framework
Scheduler.execute(process(e))
}
}
}
Run Code Online (Sandbox Code Playgroud)
这似乎是另一个问题的重复.所以我会重复我的回答
Actor一次处理一条消息.处理多个消息的经典模式是为消费者角色池设置一个协调者角色.如果使用react,则使用者池可能很大,但仍然只使用少量JVM线程.这是一个例子,我为他们创建了一个由10个消费者和一个协调员组成的池.
import scala.actors.Actor
import scala.actors.Actor._
case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop
def consumer(n : Int) = actor {
loop {
react {
case Ready(sender) =>
sender ! Ready(self)
case Request(sender, payload) =>
println("request to consumer " + n + " with " + payload)
// some silly computation so the process takes awhile
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
sender ! Result(result)
println("consumer " + n + " is done processing " + result )
case Stop => exit
}
}
}
// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)
val coordinator = actor {
loop {
react {
case msg @ Request(sender, payload) =>
consumers foreach {_ ! Ready(self)}
react {
// send the request to the first available consumer
case Ready(consumer) => consumer ! msg
}
case Stop =>
consumers foreach {_ ! Stop}
exit
}
}
}
// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)
Run Code Online (Sandbox Code Playgroud)
此代码测试以查看哪些消费者可用并向该使用者发送请求.替代方案是随机分配给消费者或使用循环调度程序.
根据您的工作情况,您可能会更好地使用Scala的期货.例如,如果你真的不需要演员,那么所有上述机器都可以写成
import scala.actors.Futures._
def transform(payload : String) = {
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
println("transformed " + payload + " to " + result )
result
}
val results = for (i <- 0 to 1000) yield future(transform(i.toString))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2164 次 |
| 最近记录: |