mat*_*ieu 0 scala future threadpool
我有一个大的计算大致基于以下模式:
def f1(i:Int):Int = ???
def f2(i:Int):Int = ???
def processA(l: List[Int]) =
l.map(i => Future(f1(i)))
def processB(l: List[Int]) = {
val p = processA(l)
p.map(fut => fut.map(f2))
}
def main() = {
val items = List( /* 1k to 10k items here */ )
val results = processB(items)
results.map(_.onComplete ( ... ))
}
Run Code Online (Sandbox Code Playgroud)
如果我的理解是正确的,我遇到的问题是处理是广度优先的.ProcessA启动了数千个期货,然后processB将汇集数千个新的期货,这些期货将在processA完成后处理.onComplete回调将开始很晚才开始...
我想把这个深度优先:过程A的几个未来开始,然后,processB从那里继续而不是切换到队列中的其他东西.
可以在香草scala中完成吗?我应该转向一些替代Futures()和ThreadPools的lib吗?
编辑:更详细一点.f1 andThen f2正如答案中所建议的那样,重写目前是不切实际的.实际上,processA and B正在做一堆其他事情(包括副作用).而processB依赖的事实ProcessA是私人的.如果曝光,它会破坏SoC.
编辑2:我想我会放松一点"香草"约束.有人建议Akka流可以提供帮助.我目前正在看scalaz.Task:有意见吗?
我不是100%肯定我理解这个问题,因为processB(f2)运行在processA(f1)的结果之上,你不能调用f2尚未计算的值f1,所以我的答案基于这样的假设: :
f2之后立即执行f1所以这是一个解决方案:
import scala.concurrent._
def process(noAtATime: Int, l: List[Int])(transform: Int => Int)(implicit ec: ExecutionContext): Future[List[Int]] = {
// define an inner async "loop" to process one chunk of numbers at a time
def batched(i: Future[Iterator[List[Int]]], result: List[List[Int]]): Future[List[Int]] =
i flatMap { it =>
// if there are more chunks to process
// we process all numbers in the chunk as parallel as possible,
// then combine the results into a List again, then when all are done,
// we recurse via flatMap+batched with the iterator
// when we have no chunks left, then we un-chunk the results
// reassemble it into the original order and return the result
if(it.hasNext) Future.traverse(it.next)(n => Future(transform(n))).flatMap(re => batched(i, re :: result))
else Future.successful(result.reverse.flatten) // Optimize this as needed
}
// Start the async "loop" over chunks of input and with an empty result
batched(Future.successful(l.grouped(noAtATime)), List.empty)
}
scala> def f1(i: Int) = i * 2 // Dummy impl to prove it works
f1: (i: Int)Int
scala> def f2(i: Int) = i + 1 // Dummy impl to prove it works
f2: (i: Int)Int
scala> process(noAtATime = 100, (1 to 10000).toList)(n => f2(f1(n)))(ExecutionContext.global)
res0: scala.concurrent.Future[List[Int]] = Future(<not completed>)
scala> res0.foreach(println)(ExecutionContext.global)
scala> List(3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99, 101, 103, 105, 107, 109, 111, 113, 115, 117, 119 …
Run Code Online (Sandbox Code Playgroud)
如果您愿意并且能够使用更适合手头问题的库,请查看此回复