Scala中如何控制未来的并发?

ung*_*cky 0 parallel-processing scala

我是 Scala 的新手。我对 Scala 的未来概念有一个一般性疑问。

假设我有一个元素列表,并且列表中存在每个元素,我必须调用一个进行一些处理的方法。

我们可以使用未来的方法并且可以并行进行处理,但我的问题是我们如何控制并行/后台运行的并发处理任务。

例如,我应该将并行运行任务限制保持为 10。因此,在 Max 时,我的未来应该生成对列表中 10 个元素的处理,并等待任何生成的进程完成。一旦空闲插槽可用,它应该为剩余元素生成进程,直到达到最大值。

我在谷歌中搜索但找不到它。在 Unix 中,可以通过在后台运行进程并使用 ps 命令手动检查计数来完成相同的操作。由于不太了解 Scala。请帮助我。

提前致谢。

Mar*_*lic 6

让我们创建两个不同大小的线程池:

val fiveThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))
val tenThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
Run Code Online (Sandbox Code Playgroud)

我们可以通过将线程池作为参数传递给 future 来控制将来运行哪个线程池,如下所示

Future(42)(tenThreadsEc)
Run Code Online (Sandbox Code Playgroud)

这相当于

Future.apply(body = 42)(executor = tenThreadsEc)
Run Code Online (Sandbox Code Playgroud)

对应于签名Future.apply

def apply[T](body: => T)(implicit executor: ExecutionContext): Future[T] =
Run Code Online (Sandbox Code Playgroud)

请注意参数是如何executor声明为 的implicit。这意味着我们可以像这样隐式提供它

implicit val tenThreadsEc = ...
Future(42) // executor = tenThreadsEc argument passed in magically
Run Code Online (Sandbox Code Playgroud)

现在,根据路易斯的建议,考虑简化签名Future.traverse

def traverse[A, B, M[X] <: IterableOnce[X]](in: M[A])(fn: A => Future[B])(implicit ..., executor: ExecutionContext): Future[M[B]]
Run Code Online (Sandbox Code Playgroud)

让我们通过将M类型构造函数参数固定为 a 来进一步简化它M = List

def traverse[A, B]
  (in: List[A])                          // list of things to process in parallel
  (fn: A => Future[B])                   // function to process an element asynchronously
  (implicit executor: ExecutionContext)  // thread pool to use for parallel processing
: Future[List[B]]                        // returned result is a future of list of things instead of list of future things
Run Code Online (Sandbox Code Playgroud)

让我们传递参数

val tenThreadsEc = ...
val myList: List[Int] = List(11, 42, -1)
def myFun(x: Int)(implicit executor: ExecutionContext): Future[Int] = Future(x + 1)(ec)

Future.traverse[Int, Int, List](
  in       = myList)(
  fn       = myFun(_)(executor = tenThreadsEc))(
  executor = tenThreadsEc,
  bf       = implicitly               // ignore this
)
Run Code Online (Sandbox Code Playgroud)

依靠隐式解析和类型推断,我们只需

implicit val tenThreadsEc = ... 
Future.traverse(myList)(myFun)
Run Code Online (Sandbox Code Playgroud)

把它们放在一起,这是一个工作示例

import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}

object FuturesExample extends App {
  val fiveThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(5))
  val tenThreadsEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val myList: List[Int] = List(11, 42, -1)
  def myFun(x: Int)(implicit executor: ExecutionContext): Future[Int] = Future(x + 1)(executor)

  Future(body = 42)(executor = fiveThreadsEc)
    .andThen(v => println(v))(executor = fiveThreadsEc)

  Future.traverse[Int, Int, List](
    in = myList)(
    fn = myFun(_)(executor = tenThreadsEc))(
    executor = tenThreadsEc,
    bf = implicitly
  ).andThen(v => println(v))(executor = tenThreadsEc)

  // Using implicit execution context call-site simplifies to...
  implicit val ec = tenThreadsEc

  Future(42)
    .andThen(v => println(v))

  Future.traverse(myList)(myFun)
    .andThen(v => println(v))
}
Run Code Online (Sandbox Code Playgroud)

哪个输出

Success(42)
Success(List(12, 43, 0))
Success(42)
Success(List(12, 43, 0))
Run Code Online (Sandbox Code Playgroud)

或者,Scala 提供默认执行上下文,称为

scala.concurrent.ExecutionContext.Implicits.global
Run Code Online (Sandbox Code Playgroud)

我们可以通过系统属性来控制它的并行度

scala.concurrent.context.minThreads
scala.concurrent.context.numThreads
scala.concurrent.context.maxThreads
scala.concurrent.context.maxExtraThreads
Run Code Online (Sandbox Code Playgroud)

例如,创建以下内容ConfiguringGlobalExecutorParallelism.scala

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

object ConfiguringGlobalExecutorParallelism extends App {
  println(scala.concurrent.ExecutionContext.Implicits.global.toString)

  Future.traverse(List(11,42,-1))(x => Future(x + 1))
    .andThen(v => println(v))
}
Run Code Online (Sandbox Code Playgroud)

并运行它

scala -Dscala.concurrent.context.numThreads=10 -Dscala.concurrent.context.maxThreads=10 ConfiguringGlobalExecutorParallelism.scala
Run Code Online (Sandbox Code Playgroud)

哪个应该输出

scala.concurrent.impl.ExecutionContextImpl$$anon$3@cb191ca[Running, parallelism = 10, size = 0, active = 0, running = 0, steals = 0, tasks = 0, submissions = 0]
Success(List(12, 43, 0))
Run Code Online (Sandbox Code Playgroud)

注意如何parallelism = 10

另一种选择是使用并行集合

libraryDependencies += "org.scala-lang.modules" %% "scala-parallel-collections" % "0.2.0"
Run Code Online (Sandbox Code Playgroud)

并通过配置并行性tasksupport,例如

val myParVector: ParVector[Int] = ParVector(11, 42, -1)
myParVector.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(10))
myParVector.map(x => x + 1)
Run Code Online (Sandbox Code Playgroud)

请注意,并行集合是与 Future 不同的功能

Scala 中的并行集合设计没有 an 的概念 ExecutionContext,它严格来说是 的属性Future。并行集合库有一个 a 的概念TaskSupport,负责并行集合内部的调度

x => x + 1所以我们可以简单地使用而不是映射集合x => Future(x + 1),并且不需要使用Future.traverse,只需一个常规映射就足够了。