如何在Scala中执行多个任务?

yur*_*ura 41 concurrency multithreading scala

我有50,000个任务,并希望用10个线程执行它们.在Java中我应该创建Executers.threadPool(10)并传递runnable然后等待处理所有.据我所知,Scala对该任务特别有用,但我无法在docs中找到解决方案.

mpi*_*ist 58

Scala 2.9.3及更高版本

最简单的方法是使用scala.concurrent.Future类和相关的基础结构.该scala.concurrent.future方法异步评估传递给它的块并立即返回Future[A]表示异步计算的块.期货可以通过多种非阻塞方式进行操作,包括映射,flatMapping,过滤,恢复错误等.

例如,这是一个创建10个任务的示例,其中每个任务都会休眠任意数量的时间,然后返回传递给它的值的平方.

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

val tasks: Seq[Future[Int]] = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val aggregated: Future[Seq[Int]] = Future.sequence(tasks)

val squares: Seq[Int] = Await.result(aggregated, 15.seconds)
println("Squares: " + squares)
Run Code Online (Sandbox Code Playgroud)

在这个例子中,我们首先创建一系列单独的异步任务,完成后提供一个int.然后Future.sequence,我们将这些异步任务组合到一个异步任务中 - 交换类型Future和位置Seq.最后,我们在等待结果的同时阻止当前线程长达15秒.在该示例中,我们使用全局执行上下文,该上下文由fork/join线程池支持.对于非平凡的示例,您可能希望使用特定于应用程序ExecutionContext.

通常,应尽可能避免阻塞.有可用的其他组合子Future类,它可以帮助程序在异步风格,其中包括onSuccess,onFailure,和onComplete.

另外,考虑调查Akka库,它为Scala和Java提供基于actor的并发性,并与之互操作scala.concurrent.

Scala 2.9.2和之前的

这种最简单的方法是使用Scala的Future类,它是Actors框架的子组件.scala.actors.Futures.future方法为传递给它的块创建一个Future.然后,您可以使用scala.actors.Futures.awaitAll等待所有任务完成.

例如,这是一个创建10个任务的示例,其中每个任务都会休眠任意数量的时间,然后返回传递给它的值的平方.

import scala.actors.Futures._

val tasks = for (i <- 1 to 10) yield future {
  println("Executing task " + i)
  Thread.sleep(i * 1000L)
  i * i
}

val squares = awaitAll(20000L, tasks: _*)
println("Squares: " + squares)
Run Code Online (Sandbox Code Playgroud)

  • \ _*语法处理将tasks变量转换为awaitAll上的var-args调用.awaitAll方法采用var-arg Future [Any],任务变量是IndexedSeq [Future [Int]].添加\ _*告诉编译器将任务扩展为varargs. (5认同)
  • 使用期货时,默认情况下,任务在fork/join调度程序中执行,该调度程序为JVM报告的每个处理器分配最多2个线程.可以通过actors.corePoolSize系统属性增加最大线程数,也可以替换整个调度程序.有关详细信息,请参阅Actor的ScalaDoc. (3认同)
  • 我不需要执行10个任务.我有50,000,并希望用10个线程执行它们. (2认同)

Jan*_*anx 16

您想要查看Scala actor库或Akka.Akka具有更清晰的语法,但要么会做到这一点.

因此,您需要创建一个知道如何处理任务的演员池.一个Actor基本上可以是任何带有receive方法的类 - 来自Akka教程(http://doc.akkasource.org/tutorial-chat-server-scala):

class MyActor extends Actor {
  def receive = {
    case "test" => println("received test")
    case _ =>      println("received unknown message")
 }}

val myActor = Actor.actorOf[MyActor]
myActor.start
Run Code Online (Sandbox Code Playgroud)

您将需要创建一个actor实例池,并将您的任务作为消息发送给它们.这里有关于Akka actor池的帖子可能会有所帮助:http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/

在您的情况下,每个任务一个actor可能是合适的(与线程相比,actor非常轻量级,因此您可以在一个VM中拥有很多),或者您可能需要在它们之间进行更复杂的负载平衡.

编辑:使用上面的示例演员,发送消息就像这样简单:

myActor ! "test"
Run Code Online (Sandbox Code Playgroud)

然后,演员将"接收测试"输出到标准输出.

消息可以是任何类型,当与Scala的模式匹配结合使用时,您可以使用强大的模式来构建灵活的并发应用程序.

一般来说,Akka演员将在线程共享方面"做正确的事",而对于OP的需求,我想默认是好的.但是如果需要,可以将actor应该使用的调度程序设置为以下几种类型之一:

* Thread-based
* Event-based
* Work-stealing
* HawtDispatch-based event-driven
Run Code Online (Sandbox Code Playgroud)

为演员设置调度员是微不足道的:

class MyActor extends Actor {
  self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("thread-pool-dispatch")
    .withNewThreadPoolWithBoundedBlockingQueue(100)
    .setCorePoolSize(10)
    .setMaxPoolSize(10)
    .setKeepAliveTimeInMillis(10000)
    .build
}
Run Code Online (Sandbox Code Playgroud)

http://doc.akkasource.org/dispatchers-scala

通过这种方式,您可以限制线程池大小,但同样,使用默认调度程序的50K Akka actor实例可能会满足原始用例,并且它可以很好地并行化.

这实际上只是触及了Akka可以做的事情的表面.它带来了许多Erlang为Scala语言提供的功能.Actor可以监视其他actor并重新启动它们,从而创建自我修复的应用程序.Akka还提供软件事务内存和许多其他功能.它可以说是Scala的"杀手级应用"或"杀手级框架".


Dan*_*ral 8

如果你想"用10个线程执行它们",那么使用线程.Scala的演员模型,通常是人们在说Scala对并发性有利时所说的,隐藏了这些细节,所以你不会看到它们.

使用actor或者你拥有的所有东西都是简单的计算,你只需要创建50000个并让它们运行.您可能面临问题,但它们具有不同的性质.


Hol*_*ndl 8

这是另一个类似于mpilquist的响应的答案,但没有弃用的API,包括通过自定义ExecutionContext的线程设置:

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._

val numJobs = 50000
var numThreads = 10

// customize the execution context to use the specified number of threads
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numThreads))


// define the tasks
val tasks = for (i <- 1 to numJobs) yield Future {
  // do something more fancy here
  i
}

// aggregate and wait for final result
val aggregated = Future.sequence(tasks)
val oneToNSum = Await.result(aggregated, 15.seconds).sum
Run Code Online (Sandbox Code Playgroud)