流媒体库中的惯用预取

jbe*_*man 3 concurrency haskell stream conduit haskell-pipes

我正在使用流媒体库,但会接受使用管道或管道的答案.

说我有

import Streaming (Stream, Of)
import qualified Streaming.Prelude as S

streamChunks :: Int -> Stream (Of Thing) IO ()
streamChunks lastID = do
  flip fix 0 $ \go thingID ->
    unless (thingID > lastID) $ do
      thing <- highLatencyGet thingID
      S.yield thing
      go (thingID+1)
Run Code Online (Sandbox Code Playgroud)

为了减少延迟,我想在处理消费者中的前highLatencyGet一个Thing时并行分叉以检索下一个Thing.

显然,我可以MVar在调用之前将我的函数转换为创建新的并分叉下一批yield,等等.

但我想知道是否有一种惯用(可组合)的方法来做到这一点,这样它就可以打包在一个库中,并且可以在任意IO上使用Stream.理想情况下,我们也可以配置预取值,例如:

prefetching :: Int -> Stream (Of a) IO () -> Stream (Of a) IO ()
Run Code Online (Sandbox Code Playgroud)

dan*_*iaz 7

此解决方案使用管道,但它可以很容易地适应使用流媒体.确切地说,它需要管道,管道 - 并发异步包.

它不能以"直接"的方式工作.Producer它不仅仅是简单地转换,还需要一个消耗a的"折叠功能" Producer.这种延续传递方式对于设置和拆除并发机制是必要的.

import Pipes
import Pipes.Concurrent (spawn',bounded,fromInput,toOutput,atomically)
import Control.Concurrent.Async (Concurrently(..))
import Control.Exception (finally)

prefetching :: Int -> Producer a IO () -> (Producer a IO () -> IO r) -> IO r
prefetching bufsize source foldfunc = do
    (outbox,inbox,seal) <- spawn' (bounded bufsize)
    let cutcord effect = effect `finally` atomically seal
    runConcurrently $
        Concurrently (cutcord (runEffect (source >-> toOutput outbox)))
        *>
        Concurrently (cutcord (foldfunc (fromInput inbox)))
Run Code Online (Sandbox Code Playgroud)

原始生产者的输出被重定向到有界队列.同时,我们将producer-folding函数应用于从队列中读取的生产者.

每当并发操作完成时,我们会小心地立即关闭通道以避免另一侧悬空.