如何实现将两个消费者合二为一的 fork 功能

Pos*_*cat 6 continuations haskell stream deriving derivingvia

我正在尝试使用“更快的协程管道”一文中描述的抽象来构建一个流媒体库。我修改了代码,以便它正确处理管道退出(而不是在发生这种情况时抛出错误):

-- | r: return type of the continuation, i: input stream type, o: output stream type,
--   m: underlying monad, a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
  deriving
    ( Functor,
      Applicative,
      Monad
    )
    via (Cont (Result r m i o))

type Result r m i o = InCont r m i -> OutCont r m o -> m r

newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}

newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}

suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok

suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok

emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok Nothing emptyIk

await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)

yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)

(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
  runPipe
    q
    (\a _ ok' -> k a emptyIk ok')
    (suspendIn (runPipe p (\() -> f)) ik)
    ok
  where
    f :: Result r m i e
    f _ ok = resumeOut ok Nothing emptyIk

runContPipe :: forall m a. Applicative m => ContPipe a () Void m a -> m a
runContPipe p = runPipe p (\a _ _ -> pure a) ik ok
  where
    ik :: InCont a m ()
    ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
    ok :: OutCont a m Void
    ok = MakeOutCont \_ ik' -> resumeIn ik' ok
Run Code Online (Sandbox Code Playgroud)

我想实现一个功能

fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
Run Code Online (Sandbox Code Playgroud)

这将两个消费者流合并为一个(类似于管道的ZipSink)。它应该具有以下语义:

  1. 如果两个流都没有退出并且正在接受输入,则向两个流提供相同的输入值
  2. 如果一个流已退出,则存储返回值,然后将输入输入到接受该值的流中
  3. 如果两个流都已退出,则退出并将两个流的返回值放入一个元组中。

这是我的尝试:

我们重用了loop论文中将 a 连接InCont r m i到 2OutCont r m i并主动恢复延续的函数。

loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
  resumeIn ik $ MakeOutCont \v ik' ->
    resumeOut ok1 v $ MakeInCont \ok1' ->
      resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'
Run Code Online (Sandbox Code Playgroud)

由于loop我们可以将生成的管道的输入同时连接到两个管道中,输出将在两个管道之间共享(这并不重要,因为您无法产生 a Void)。

fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
  MakePipe \k ik ok ->
    let f :: a -> Result r m i Void
        f a ik' ok' = _
        g :: b -> Result r m i Void
        g b ik' ok' = _
     in runPipe
          p
          f
          (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
          ok
Run Code Online (Sandbox Code Playgroud)

现在我们只需要填写的延续fg将由被称为p并且q当他们退出。如果g在被调用时已经被调用f,表示q已经退出,那么f应该调用continuation k,如果g还没有被调用,那么f应该存储返回值a并恢复输入continuation(通过丢弃所有传递的值)似乎对我来说,如果没有某种形式的共享状态,就不可能实现这一目标。我们可以尝试m使用状态 monad来存储状态:

fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
  MakePipe \k ik ok ->
    let f :: a -> Result r m i Void
        f a ik' ok' = do
          s <- get
          case s of
            Nothing -> do
              put (Just (Left a))
              resumeIn ik' sinkOk
            Just (Right b) -> do
              k (a, b) ik' ok'
            _ -> error "unexpected state"
        g :: b -> Result r m i Void
        g b ik' ok' = do
          s <- get
          case s of
            Nothing -> do
              put (Just (Right b))
              resumeIn ik' sinkOk
            Just (Left a) -> do
              k (a, b) ik' ok'
            _ -> error "unexpected state"
     in runPipe
          p
          f
          (MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
          ok
Run Code Online (Sandbox Code Playgroud)

sinkOk 是丢弃所有输入的输出延续:

sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk
Run Code Online (Sandbox Code Playgroud)

我们现在可以添加一些辅助功能进行测试:

print' :: MonadIO m => Show i => ContPipe r i o m ()
print' = do
  m <- await
  case m of
    Nothing -> pure ()
    Just i -> do
      lift $ liftIO (print i)
      print'

upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
  yield i
  upfrom (i + 1)

take' :: Int -> ContPipe r i i m ()
take' n
  | n <= 0 = pure ()
  | otherwise = do
    m <- await
    case m of
      Nothing -> pure ()
      Just i -> do
        yield i
        take' (n - 1)
Run Code Online (Sandbox Code Playgroud)

这在p退出时间早于 的情况下确实有效q

flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' print'
Run Code Online (Sandbox Code Playgroud)

给出所需的输出:

1
1
2
2
3
3
((),())
Run Code Online (Sandbox Code Playgroud)

但是当q退出时间早于时,它会进入无限循环p

flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')
Run Code Online (Sandbox Code Playgroud)

输出:

1
1
2
2
<loops>
Run Code Online (Sandbox Code Playgroud)