Sol*_*lma 2 parallel-processing f# asynchronous
以下函数通过首先将列表分解为大块然后处理每个块来并行化列表的处理.
let chunkList chunkSize (xs : list<'T>) =
query {
for idx in 0..(xs.Length - 1) do
groupBy (idx / chunkSize) into g
select (g |> Seq.map (fun idx -> xs.[idx]))
}
let par (foo: 'T -> 'S) (xs: list<'T>) =
xs
|> List.map (fun x -> async { return foo x })
|> Async.Parallel
|> Async.RunSynchronously
|> Array.toList
let parChunks chunkSize (f: 'T -> 'S) (xs: list<'T>) =
chunkList chunkSize xs |> Seq.map List.ofSeq |> List.ofSeq
|> par (List.map f)
|> List.concat
Run Code Online (Sandbox Code Playgroud)
该函数用于测试parChunks:
let g i = [1..1000000] |> List.map (fun x -> sqrt (float (1000 * x + 1))) |> List.head
Run Code Online (Sandbox Code Playgroud)
运行标准List.Seq和`parChunk``,块大小等于列表大小的1/2,有一个性能提升:
List.map g [1..100] ;; // Real:00:00:28.979,CPU:00:00:29.562
parChunks 50克[1..100] ;; // Real:00:00:23.027,CPU:00:00:24.687
但是,如果块大小等于列表大小的1/4,则性能几乎相同.我没想到这个,因为我的处理器(Intel 6700HQ)有四个核心.
parChunks 25 g [1..100] ;; // Real:00:00:21.695,CPU:00:00:24.437
在一个Performance应用程序中Task Manager查看从未使用过的四个核心.
有没有办法让所有四个核心参与这个计算?
我认为你这个问题太复杂了.
async工作流的主要用途不是针对CPU绑定工作,而是针对IO绑定工作,以避免在等待将以一些延迟到达的结果时阻塞线程.
虽然您可以并行使用CPU绑定工作async,但这样做并不是最理想的.
通过Array.Parallel在Arrays而不是Lists 上使用模块,可以更容易地实现您想要的.
let g i =
[|1..1000000|]
|> Array.Parallel.map (fun x -> sqrt (float (1000 * x + 1)))
|> Array.head
Run Code Online (Sandbox Code Playgroud)
无需编写自己的分块和合并代码,这些都是为您处理的,并且通过我的测量,它的速度要快得多.