jbe*_*man 3 concurrency haskell stream conduit haskell-pipes
我正在使用流媒体库,但会接受使用管道或管道的答案.
说我有
import Streaming (Stream, Of)
import qualified Streaming.Prelude as S
streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
flip fix 0 $ \go thingID ->
unless (thingID > lastID) $ do
thing <- highLatencyGet thingID
S.yield thing
go (thingID+1)
Run Code Online (Sandbox Code Playgroud)
为了减少延迟,我想在处理消费者中的前highLatencyGet一个Thing时并行分叉以检索下一个Thing.
显然,我可以MVar在调用之前将我的函数转换为创建新的并分叉下一批yield,等等.
但我想知道是否有一种惯用(可组合)的方法来做到这一点,这样它就可以打包在一个库中,并且可以在任意IO上使用Stream.理想情况下,我们也可以配置预取值,例如:
prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
Run Code Online (Sandbox Code Playgroud)
此解决方案使用管道,但它可以很容易地适应使用流媒体.确切地说,它需要管道,管道 - 并发和异步包.
它不能以"直接"的方式工作.Producer它不仅仅是简单地转换,还需要一个消耗a的"折叠功能" Producer.这种延续传递方式对于设置和拆除并发机制是必要的.
import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)
prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
(outbox,inbox,seal) <- spawn' (bounded bufsize)
let cutcord effect = effect `finally` atomically seal
runConcurrently $
Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
*>
Concurrently (cutcord (foldfunc (fromInput inbox)))
Run Code Online (Sandbox Code Playgroud)
原始生产者的输出被重定向到有界队列.同时,我们将producer-folding函数应用于从队列中读取的生产者.
每当并发操作完成时,我们会小心地立即关闭通道以避免另一侧悬空.