标签: haskell-pipes

如何拥有多种通信类型的管道?

说我有这个代码:

import Control.Monad.State hiding (StateT)
import Control.Proxy

server :: (Proxy p, Monad m) => Int -> Server p Int Bool (StateT Int m) ()
server = runIdentityK loop
    where loop arg = do
        currMax <- lift get
        lift $ put $ max currMax arg
        nextArg <- respond (even arg)
        loop nextArg

client :: (Proxy p, Monad m) => Client p Int Bool m ()
client = runIdentityP loop
    where loop = go 1
          go i = do
            isEven <- request …
Run Code Online (Sandbox Code Playgroud)

haskell haskell-pipes

6
推荐指数
1
解决办法
201
查看次数

使用管道4.0折叠流的子集

我正在尝试理解管道4.0,并希望转换一些管道代码.假设我有一个Ints 流,我想跳过前五个,然后得到以下的总和5.使用普通列表,这将是:

sum . take 5 . drop 5
Run Code Online (Sandbox Code Playgroud)

在管道中,这将是:

drop 5
isolate 5 =$ fold (+) 0
Run Code Online (Sandbox Code Playgroud)

或者作为一个完整的程序:

import Data.Conduit
import Data.Conduit.List (drop, isolate, fold)
import Prelude hiding (drop)

main :: IO ()
main = do
    res <- mapM_ yield [1..20] $$ do
        drop 5
        isolate 5 =$ fold (+) 0
    print res
Run Code Online (Sandbox Code Playgroud)

但是,我不太确定如何使用管道来做到这一点.

haskell conduit haskell-pipes

6
推荐指数
1
解决办法
548
查看次数

如何使用管道状态?

我有一个类型的功能Map Int String -> Proxy () a () Void IO b.现在它await是s,做任何有价值的东西,然后重新调用自己.我想把它更改为使用State (Map Int String)而不是将其作为参数传递,所以我可以使用forever并且不需要让每个分支都记得递归.我得到了我需要用来与另一个monad StateT结合State,但是我不明白那个类型签名StateT属于哪个,或者我是否需要lift像这样的函数get.对于a State (Map Int String)和a 的函数,正确的类型是Proxy () a () Void IO b什么?

haskell haskell-pipes

6
推荐指数
1
解决办法
649
查看次数

在Haskell中折叠生产者/分析器时的空间爆炸

假设我有一个这样的模块:

module Explosion where

import Pipes.Parse (foldAll, Parser, Producer)
import Pipes.ByteString (ByteString, fromLazy)
import Pipes.Aeson (DecodingError)
import Pipes.Aeson.Unchecked (decoded)
import Data.List (intercalate)
import Data.ByteString.Lazy.Char8 (pack)
import Lens.Family (view)
import Lens.Family.State.Strict (zoom)

produceString :: Producer ByteString IO ()
produceString = fromLazy $ pack $ intercalate " " $ map show [1..1000000]

produceInts :: 
    Producer Int IO (Either (DecodingError, Producer ByteString IO ()) ())
produceInts = view decoded produceString

produceInts' :: Producer Int IO ()
produceInts' = produceInts >> return ()

parseBiggest …
Run Code Online (Sandbox Code Playgroud)

memory reduce haskell stream haskell-pipes

6
推荐指数
1
解决办法
218
查看次数

向下游发信号通知上游耗尽

使用Haskell 管道库,我正在尝试Pipe使用以下类型定义:

signalExhausted :: Monad m => Pipe a (Value a) m r 
Run Code Online (Sandbox Code Playgroud)

其中Value数据类型被定义为:

data Value a = Value a | Exhausted
Run Code Online (Sandbox Code Playgroud)

管道应遵守以下法律:

toList (each [] >-> signalExhausted) ==                 [Exhausted]
toList (each xs >-> signalExhausted) == map Value xs ++ [Exhausted]
Run Code Online (Sandbox Code Playgroud)

换句话说,管道应该相当于Pipes.Prelude.map Value,除了Exhausted在处理完所有上游值之后它应该产生额外的,使下游有机会执行一些最终操作.

可以这样Pipe定义吗?

> let xs = words "hubble bubble toil and trouble"
> toList $ each xs >-> signalExhausted
[Value "hubble", Value "bubble", Value "toil", …
Run Code Online (Sandbox Code Playgroud)

haskell haskell-pipes

6
推荐指数
1
解决办法
159
查看次数

如何将基于拉力的管道转变为基于推力的管道?

默认情况下,管道是基于拉的.这是由于实施的操作员>->,通过+>>bind操作员是他的拉动类别的有意义的操作员.我的理解是,这意味着如果你有代码producer >-> consumer,首先会调用消费者的身体,然后一旦等待数据,就会调用生产者.

我见过的在pipes文档在这里,你可以使用代码(reflect .)Pipes.Core把一个基于拉管道进入基于推送管.这意味着(纠正我,如果我错了)在上面的代码中producer >-> consumer,生产者先运行,产生一个值,然后消费者试图消费.这似乎非常有用,我想知道如何做到这一点.

我在这里的讨论中也看到没有基于推送的对应物,>->因为它很容易转换任何管道(我假设有反射?),但我无法真正想出如何做或找到任何例子.

这是我尝试过的一些代码:

stdin :: Producer String IO r
stdin = forever $ do
  lift $ putStrLn "stdin"
  str <- lift getLine
  yield str

countLetters :: Consumer String IO r
countLetters = forever $ do
  lift $ putStrLn "countLetters"
  str <- await
  lift . putStrLn . show . length $ str

-- …
Run Code Online (Sandbox Code Playgroud)

haskell haskell-pipes

6
推荐指数
1
解决办法
439
查看次数

在没有缓冲的情况下读取大文件中的大行

我想知道是否有一种简单的方法可以一次从文件中获取一行,而不是最终将整个文件加载到内存中.我想用attoparsec解析器在线上进行折叠.我尝试使用Data.Text.Lazy.IO,hGetLine这打击了我的记忆.我后来读到最终加载整个文件.

我使用也试过管道文本foldsview lines:

s <- Pipes.sum $ 
    folds (\i _ -> (i+1)) 0 id (view Text.lines (Text.fromHandle handle))
print s
Run Code Online (Sandbox Code Playgroud)

只计算行数,它似乎做了一些不稳定的东西"hGetChunk:无效的参数(无效的字节序列)",需要11分钟,wc -l需要1分钟.我听说管道文本可能有一些巨大的线条问题?(每行约1GB)

我对任何建议都很开放,除了新手readLine怎么样之外找不到多少搜索.

谢谢!

haskell haskell-pipes

6
推荐指数
1
解决办法
227
查看次数

管道教程:ListT示例

我试图弄清楚管道教程中提到的一个例子ListT:

import Pipes
import qualified Pipes.Prelude as P

input :: Producer String IO ()
input = P.stdinLn >-> P.takeWhile (/= "quit")

name :: ListT IO String
name = do
    firstName <- Select input
    lastName  <- Select input
    return (firstName ++ " " ++ lastName)
Run Code Online (Sandbox Code Playgroud)

如果运行上面的示例,我们得到如下输出:

>>> runEffect $ every name >-> P.stdoutLn
Daniel<Enter>
Fischer<Enter>
Daniel Fischer
Wagner<Enter>
Daniel Wagner
quit<Enter>
Donald<Enter>
Stewart<Enter>
Donald Stewart
Duck<Enter>
Donald Duck
quit<Enter>
quit<Enter>
>>> 
Run Code Online (Sandbox Code Playgroud)

看起来:

  1. 当你运行它(在ghci上)时,你输入的第一个名字将被绑定,只有第二个名称会改变.我希望两个生产者(定义为Select input)在读取输入时轮流(可能是非确定性的).
  2. 输入 …

haskell haskell-pipes

6
推荐指数
1
解决办法
281
查看次数

如何通过每次传入重置的超时来管道?

withTimeout功能是假设管道ConsoleEventCeTimeout发送的每一个s :: Int,如果没有已收到秒.相反,它无法CeTimeout在适当的时间发送事件.CeTimeout如果s在原始事件丢失的情况下超过秒数,则会将一个事件替换为其他事件.而不是一个CeTimeout事件,它应该是已经过的每个第二个周期的计数n*s CeTimeout事件.错误在哪里,纠正是什么?谢谢!ns

withTimeout :: (MonadIO t) => Int -> Pipe ConsoleEvent ConsoleEvent t ()
withTimeout ((* 1000000) -> s) = join . liftIO $ work
  where
    work :: (MonadIO t) => IO (Pipe ConsoleEvent ConsoleEvent t ()) 
    work =
      do
        (oSent, iKept) <- spawn $ bounded 1
        (oKept, iSent) <- spawn $ unbounded
        (oTimeout, iTimeout) <- spawn $ …
Run Code Online (Sandbox Code Playgroud)

concurrency haskell haskell-pipes

6
推荐指数
1
解决办法
165
查看次数

Haskell管道:如何将bytestring写入文件

我有以下代码应该为[Word8]创建管道使用者并将其输出到文件.

import Pipes
import qualified Pipes.Binary as PB
import qualified Pipes.ByteString as PBS
import qualified System.IO as SIO 

save :: String -> Consumer [Word8] IO ()
save filename = do
  fh <- lift $ SIO.openFile filename WriteMode
  _ <- Pipes.for cat PB.encode >-> PBS.toHandle fh
  lift $ SIO.hClose fh
Run Code Online (Sandbox Code Playgroud)

但是,没有输出(尽管文件被创建).如果我用"PBS.stdout"替换"PBS.toHandle fh",我得到输出(到stdout).

我也尝试使用SIO.withFile和我创建的整个管道作为参数

 SIO.withFile "output.bin" WriteMode $ \fh -> 
   runEffect $ input-processing >-> Pipes.for cat PB.encode >-> PBS.toHandle fh
Run Code Online (Sandbox Code Playgroud)

它的工作原理.但在这种情况下,我对如何将SIO.withFile放入具有上述签名的函数(仅返回使用者)中一无所知.

谢谢你解释我做错了什么.

haskell haskell-pipes

6
推荐指数
0
解决办法
103
查看次数

标签 统计

haskell ×10

haskell-pipes ×10

concurrency ×1

conduit ×1

memory ×1

reduce ×1

stream ×1