V. *_*ria 0 parallel-processing haskell
我的确切示例是并行化这种树聚合,其中信息从叶子流向根:
aggregate :: ([a] -> a) -> Tree a -> Tree a
aggregate _ (Node x []) = Node x []
aggregate aggregator (Node _ children) =
let agChildren = map (aggregate aggregator) children in
Node (aggregator $ map (\(Node y _) -> y) agChildren) agChildren
Run Code Online (Sandbox Code Playgroud)
我希望聚合器函数的每个应用程序都在不同的线程上处理.所以我想想改变上面的代码,以便它生成一个依赖任务树并将它们提供给一个线程池.
我不希望某个节点受到影响,等待子线程完成.相反,这个等待的线程应该计算树中的其他可用子节点.同时为每个节点运行一个线程也太慢了.我的树可以有数百个节点,我的机器只有8个核心:他们会花时间安排而不是计算.我需要一个只在其他任务完成时才使用任务的线程池.
如下面的ErikR所提到的,parMap
似乎正是这样做的.我尝试了它并用strat 64 + RTS -N2执行它,以获得完全相同的计算时间.这是代码(只是为了测试性能而进行愚蠢的计算),你明白为什么时间不会改变吗?
slowAggregate :: [Int] -> Int
slowAggregate l = let s = sum l in
sum [a + b + c | a <- [0..s], b <- [0..s], c <- [0..s] ]
bigTree :: Tree Int
bigTree = Node 0 $ map (\x -> Node x []) [71..78]
aggregate :: NFData a => ([a] -> a) -> Tree a -> Tree a
aggregate _ (Node x []) = Node x []
aggregate aggregator (Node _ children) =
let agChildren = parMap rdeepseq (aggregate aggregator) children in
Node (aggregator $ map (\(Node y _) -> y) agChildren) agChildren
main = timeIt $ let (Node y _) = aggregate slowAggregate bigTree in print y
Run Code Online (Sandbox Code Playgroud)
并行运行时已经管理了一个用于sparks的线程池.从这个SO回答:
火花不是线程.forkIO引入了Haskell线程(它映射到较少的实际OS线程).Sparks在每个线程的工作队列中创建条目,如果线程变为空闲,它们将从中执行任务.
所以我首先尝试使用parMap strat
,看看它是否适合你.
事实上,如果你的树是Traversable,我会看看parTraversable
:
parTraversable :: Traversable t => Strategy a -> Strategy (t a)
Run Code Online (Sandbox Code Playgroud)