我非常喜欢管道/管道的概念,用于将操作应用于流式IO源.我感兴趣的是构建适用于非常大的日志文件的工具.从Python/Ruby迁移到Haskell的一个吸引人的地方是编写并行代码的简单方法,但我找不到任何这方面的文档.我怎么能设置一个管道流来读取文件中的行并且并行处理它们(即有8个核,它应该读取8行,然后将它们移交给8个不同的线程进行处理,然后再次收集等),理想情况下尽可能少的"仪式"......
可选地,可以注意线是否需要按顺序重新加入,如果这可能影响过程的速度?
我确信可以使用Parallel Haskell书中的想法自己拼凑一些东西,但在我看来,在Conduit工作流程中间并行(parmap等)运行纯函数应该非常简单?
作为PetrPudlák在他的评论中提到的"内部并行性"的一个例子,考虑这个函数(我正在使用pipes
,但可以conduit
同样容易地实现):
import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L
concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer =
L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
>->
forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield)
Run Code Online (Sandbox Code Playgroud)
该函数作为参数的一组尺寸,我们希望对于类型的每个值运行操作a
,和一个Producer
的a
值.
它返回一个新的Producer
.在内部,生产者a
分批读取值,groupsize
同时处理它们,并逐个产生结果.
该代码用于Pipes.Group
将原始生成器"分区"为大小的子生成器groupsize
,然后Control.Foldl
将每个子生成器"折叠"到列表中.
对于更复杂的任务,您可以转向pipes-concurrency
或提供的异步通道stm-conduit
.但是这些让你有点像香草管道/管道的"单一管道"世界观.