emc*_*sen 4 haskell haskell-pipes
这个问题有点像高尔夫球和很多新手.
我pipes在Haskell中使用了很棒的库,我想拆分管道以沿多个通道发送相同的数据(进行广播).本Pipes.Concurrent教程建议使用spawn创建邮箱,利用Output幺半群状态.例如,我们可能会这样做:
main = do
(output1, input1) <- spawn Unbounded
(output2, input2) <- spawn Unbounded
let effect1 = fromInput input1 >-> pipe1
let effect2 = fromInput input2 >-> pipe2
let effect3 = P.stdinLn >-> toOutput (output1 <> output2)
...
Run Code Online (Sandbox Code Playgroud)
通过邮箱的这种间接是否真的有必要?我们可以写这样的东西吗?
main = do
let effect3 = P.stdinLn >-> (pipe1 <> pipe2)
...
Run Code Online (Sandbox Code Playgroud)
以上不编译,因为Pipe没有Monoid实例.有这么好的理由吗?第一种方法真的是拆分管道最干净的方法吗?
有两种方法可以在不使用并发的情况下执行此操作,两者都有警告.
第一种方式是,如果pipe1和pipe2只是简单的Consumers表示永远循环下去,如:
p1 = for cat f -- i.e. p1 = forever $ await >>= f
p2 = for cat g -- i.e. p2 = forever $ await >>= g
Run Code Online (Sandbox Code Playgroud)
...那么解决这个问题的简单方法就是写:
for P.stdinLn $ \str -> do
f str
g str
Run Code Online (Sandbox Code Playgroud)
例如,如果p1只是print每个值:
p1 = for cat (lift . print)
Run Code Online (Sandbox Code Playgroud)
...而且p2只是将该值写入句柄:
p2 = for cat (lift . hPutStrLn h)
Run Code Online (Sandbox Code Playgroud)
...然后你会像这样组合它们:
for P.stdinLn $ \str -> do
lift $ print str
lift $ hPutStrLn h str
Run Code Online (Sandbox Code Playgroud)
但是,这种简化只适用于Consumer那些简单的循环.还有另一种更通用的解决方案,即为ArrowChoice管道定义实例.我认为基于拉的Pipes不允许正确的守法实例,但基于推送的Pipes做:
newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }
instance (Monad m) => Category (Edge m r) where
id = Edge push
(Edge p2) . (Edge p1) = Edge (p1 >~> p2)
instance (Monad m) => Arrow (Edge m r) where
arr f = Edge (push />/ respond . f)
first (Edge p) = Edge $ \(b, d) ->
evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b
where
up () = do
(b, d) <- request ()
lift $ put d
return b
dn c = do
d <- lift get
respond (c, d)
instance (Monad m) => ArrowChoice (Edge m r) where
left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
where
bef x = case x of
Left b -> return b
Right d -> do
_ <- respond (Right d)
x2 <- request ()
bef x2
up () = do
x <- request ()
bef x
dn c = respond (Left c)
Run Code Online (Sandbox Code Playgroud)
这需要一个newtype,以便类型参数按ArrowChoice预期的顺序排列.
如果您不熟悉基于推送的术语Pipe,它基本上是Pipe从最上游管道而不是最下游管道开始的,它们都具有以下形状:
a -> Pipe a b m r
Run Code Online (Sandbox Code Playgroud)
可以把它想象成Pipe在从上游收到至少一个值之前不能"去".
这些基于推送的Pipes是传统的基于拉的s的"双重" Pipe,具有自己的组合运算符和标识:
(>~>) :: (Monad m)
=> (a -> Pipe a b m r)
-> (b -> Pipe b c m r)
-> (a -> Pipe a c m r)
push :: (Monad m)
-> a -> Pipe a a m r
Run Code Online (Sandbox Code Playgroud)
...但是Pipes默认情况下单向API不会导出它.您只能从中获取这些操作符Pipes.Core(并且您可能希望更密切地研究该模块以建立对其工作方式的直觉).该模块显示基于推送的Pipes和基于拉的Pipes都是更通用的双向版本的特殊情况,并且理解双向情况是你如何理解为什么它们是彼此的对偶.
有了Arrow基于推送的管道的实例后,您可以编写如下内容:
p >>> bifurcate >>> (p1 +++ p2)
where
bifurcate = Edge $ pull ~> \a -> do
yield (Left a) -- First give `p1` the value
yield (Right a) -- Then give `p2` the value
Run Code Online (Sandbox Code Playgroud)
然后runEdge,当您完成后,您将使用它将其转换为基于拉的管道.
这种方法有一个主要的缺点,即你不能自动将基于拉的管道升级到基于推进的管道(但通常不难弄清楚如何手动完成).例如,要升级Pipes.Prelude.map为基于推送Pipe,您可以编写:
mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r)
mapPush f a = do
yield (f a)
Pipes.Prelude.map f
Run Code Online (Sandbox Code Playgroud)
那么它有正确的类型包含在Arrow:
mapEdge :: (Monad m) => (a -> b) -> Edge m r a b
mapEdge f = Edge (mapPush f)
Run Code Online (Sandbox Code Playgroud)
当然,更简单的方法就是从头开始编写:
mapEdge f = Edge $ push ~> yield . f
Run Code Online (Sandbox Code Playgroud)
使用最适合您的方法.
事实上,我提出了这些Arrow和ArrowChoice实例,正是因为我试图回答与你完全相同的问题:如何在不使用并发的情况下解决这些问题?我在这里的另一个Stack Overflow答案中写了一个关于这个更一般主题的长答案,在这里我描述了如何使用这些Arrow和ArrowChoice实例将并发系统提炼成等效的纯系统.