有时我想同时为网络活动等并行运行最大量的IO操作.我启动了一个小的并发线程函数,它可以很好地与https://gist.github.com/810920配合使用,但这不是'真的是一个游泳池,因为所有的IO动作必须先完成才能开始.
我正在寻找的类型是这样的:
runPool :: Int -> [IO a] -> IO [a]
Run Code Online (Sandbox Code Playgroud)
并且应该能够在有限和无限列表上运行.
管道包似乎能够很好地实现这一点,但我觉得可能有一个类似的解决方案,我只提供了来自haskell平台的mvars等.
有没有人遇到过没有任何重依赖的惯用解决方案?
你需要一个线程池,如果你想要一些简短的东西,你可以从Control.ThreadPool(来自控制引擎包,它也提供更多通用功能)中获取灵感,例如threadPoolIO就是:
threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b)
threadPoolIO nr mutator = do
input <- newChan
output <- newChan
forM_ [1..nr] $
\_ -> forkIO (forever $ do
i <- readChan input
o <- mutator i
writeChan output o)
return (input, output)
Run Code Online (Sandbox Code Playgroud)
它使用两个Chan与外部进行通信,但这通常是你想要的,它真的有助于编写不会搞砸的代码.
如果你绝对想要将它包装在你的类型的函数中,你也可以封装通信:
runPool :: Int -> [IO a] -> IO [a]
runPool n as = do
(input, output) <- threadPoolIO n (id)
forM_ as $ writeChan input
sequence (repeat (length as) $ readChan output)
Run Code Online (Sandbox Code Playgroud)
这不会保持您的操作顺序,这是一个问题(通过传输操作的索引或仅使用数组来存储响应很容易纠正)?
注意:使用这个简单版本,n个线程将永远保持活跃状态,如果您打算在长时间运行的应用程序中创建并删除其中的几个池,那么向threadPoolIO添加"killAll"返回的操作将会轻松解决此问题(如果没有,则给定在Haskell中线程的重量,它可能不值得麻烦).请注意,此函数仅适用于有限列表,这是因为IO通常是严格的,因此您无法在生成整个列表之前开始处理IO [a]的元素,如果您真的希望您要么使用惰性使用unsafeInterleaveIO的IO(可能不是最好的主意)或完全更改您的模型并使用管道之类的东西来传输结果.