Pos*_*cat 6 continuations haskell stream deriving derivingvia
我正在尝试使用“更快的协程管道”一文中描述的抽象来构建一个流媒体库。我修改了代码,以便它正确处理管道退出(而不是在发生这种情况时抛出错误):
-- | r: return type of the continuation, i: input stream type, o: output stream type,
-- m: underlying monad, a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
deriving
( Functor,
Applicative,
Monad
)
via (Cont (Result r m i o))
type Result r m i o = InCont r m i -> OutCont r m o -> m r
newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}
newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}
suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok
suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok
emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok Nothing emptyIk
await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)
yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)
(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
runPipe
q
(\a _ ok' -> k a emptyIk ok')
(suspendIn (runPipe p (\() -> f)) ik)
ok
where
f :: Result r m i e
f _ ok = resumeOut ok Nothing emptyIk
runContPipe :: forall m a. Applicative m => ContPipe a () Void m a -> m a
runContPipe p = runPipe p (\a _ _ -> pure a) ik ok
where
ik :: InCont a m ()
ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
ok :: OutCont a m Void
ok = MakeOutCont \_ ik' -> resumeIn ik' ok
Run Code Online (Sandbox Code Playgroud)
我想实现一个功能
fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
Run Code Online (Sandbox Code Playgroud)
这将两个消费者流合并为一个(类似于管道的ZipSink)。它应该具有以下语义:
这是我的尝试:
我们重用了loop论文中将 a 连接InCont r m i到 2OutCont r m i并主动恢复延续的函数。
loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
resumeIn ik $ MakeOutCont \v ik' ->
resumeOut ok1 v $ MakeInCont \ok1' ->
resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'
Run Code Online (Sandbox Code Playgroud)
由于loop我们可以将生成的管道的输入同时连接到两个管道中,输出将在两个管道之间共享(这并不重要,因为您无法产生 a Void)。
fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = _
g :: b -> Result r m i Void
g b ik' ok' = _
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
Run Code Online (Sandbox Code Playgroud)
现在我们只需要填写的延续f和g将由被称为p并且q当他们退出。如果g在被调用时已经被调用f,表示q已经退出,那么f应该调用continuation k,如果g还没有被调用,那么f应该存储返回值a并恢复输入continuation(通过丢弃所有传递的值)似乎对我来说,如果没有某种形式的共享状态,就不可能实现这一目标。我们可以尝试m使用状态 monad来存储状态:
fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Left a))
resumeIn ik' sinkOk
Just (Right b) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
g :: b -> Result r m i Void
g b ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Right b))
resumeIn ik' sinkOk
Just (Left a) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
Run Code Online (Sandbox Code Playgroud)
sinkOk 是丢弃所有输入的输出延续:
sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk
Run Code Online (Sandbox Code Playgroud)
我们现在可以添加一些辅助功能进行测试:
print' :: MonadIO m => Show i => ContPipe r i o m ()
print' = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
lift $ liftIO (print i)
print'
upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
yield i
upfrom (i + 1)
take' :: Int -> ContPipe r i i m ()
take' n
| n <= 0 = pure ()
| otherwise = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
yield i
take' (n - 1)
Run Code Online (Sandbox Code Playgroud)
这在p退出时间早于 的情况下确实有效q:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' print'
Run Code Online (Sandbox Code Playgroud)
给出所需的输出:
1
1
2
2
3
3
((),())
Run Code Online (Sandbox Code Playgroud)
但是当q退出时间早于时,它会进入无限循环p:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')
Run Code Online (Sandbox Code Playgroud)
输出:
1
1
2
2
<loops>
Run Code Online (Sandbox Code Playgroud)