我想将相同的数据分成两个"分支"进行单独处理,然后"加入"......
+----------+
+---------+ -->| doublber |--- +--------+
+--------+ | |-- +----------+ -->| | +------+
| source |-->| splitter| | summer |-->| sink |
+--------+ | |-- +----------+ -->| | +------+
+---------+ -->| delayer |--- +--------+
+----------+
Run Code Online (Sandbox Code Playgroud)
我该怎么做?
我的尝试:
import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
-- import Data.Conduit.Internal (zipSources)
import Control.Arrow ((>>>))
source :: Source IO Int
source = do
x <- liftIO $ getLine
yield (read x)
source
splitter :: Conduit Int IO (Int, Int)
splitter = CL.map $ \x -> (x,x)
doubler = CL.map (* 2)
delayer :: Conduit Int IO Int
delayer = do
yield 0
CL.map id
twoConduitBranches :: Monad m => Conduit a m b -> Conduit c m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = awaitForever $ \(x, y) -> do
out1 <- undefined q x
out2 <- undefined w y
yield (out1, out2)
summer :: Conduit (Int,Int) IO Int
summer = CL.map $ \(x,y) -> x + y
sink :: Sink Int IO ()
sink = CL.mapM_ (show >>> putStrLn)
-- combosrc = zipSources (source $= delayer) (source $= doubler)
main = source $= splitter $= twoConduitBranches doubler delayer $= summer $$ sink
Run Code Online (Sandbox Code Playgroud)
我应该写什么代替undefined
s?
你可以这样做,但它很丑陋,希望实现能够清楚地说明为什么它很丑陋,而不是管道的内置功能:
twoConduitBranches :: Monad m => Conduit a m c -> Conduit b m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = getZipConduit
(ZipConduit (CL.map fst =$= q =$= CL.map Left)
<* ZipConduit (CL.map snd =$= w =$= CL.map Right)) =$= collapse
where
collapse = do
v1 <- await
case v1 of
Nothing -> return ()
Just (Left _) -> error "out of sequence 1"
Just (Right d) -> do
v2 <- await
case v2 of
Nothing -> error "mismatched count"
Just (Right _) -> error "out of sequence 2"
Just (Left c) -> do
yield (c, d)
collapse
Run Code Online (Sandbox Code Playgroud)
(注意:我稍微调整了你的类型签名,我认为这是你真正想要的类型签名。)
方法如下:转为q
a Conduit
,从每个传入元组中获取第一个值,然后用 包装其输出Left
。类似地,我们从每个传入元组中获取第二个值并将其传递给w
,然后用 包装输出Right
。
现在这些Conduit
具有相同的类型(它们接受相同的输入元组,并生成相同的 Either 值),我们使用 组合它们ZipConduit
,它在所有组件之间共享输入并将输出合并到单个流中。
这个流就是流Either c d
,不是想要的(c, d)
。为了进行最终的转换,我们使用collapse
. 它弹出 aRight
和Left
value,然后将它们组合成一个生成的元组。
此函数假设输出值序列始终是来自 的一个值w
,然后是来自 的一个值q
。如果发生其他情况,它将抛出异常。问题是:管道中没有任何内容表明它们实际上会以相同的速率产生输出。事实上,管道是专门为避免这种假设而设计的!
因此,如果您知道两个组件导管将始终以相同的速率产生输出,则此功能将起作用。但一般来说,情况并非如此。
归档时间: |
|
查看次数: |
390 次 |
最近记录: |