标签: haskell-pipes

管道中的"子解析器" - attoparsec

我正在尝试使用Haskell中的pipes-attoparsec解析二进制数据.管道(代理)涉及的原因是将读取与解析交错以避免大文件的高内存使用.许多二进制格式基于块(或块),它们的大小通常由文件中的字段描述.我不确定这个块的解析器是什么被调用,但这就是我所说的标题中的"sub-parser".我遇到的问题是以简洁的方式实现它们而没有可能很大的内存占用.我想出了两个在某些方面都失败的替代方案.

备选方案1是将块读入单独的字节串并为其启动单独的解析器.简洁,大块将导致高内存使用.

备选方案2是在相同的上下文中保持解析并跟踪消耗的字节数.这种跟踪容易出错,并且似乎会影响组成最终blockParser的所有解析器.对于格式错误的输入文件,在比较跟踪大小之前,还可以通过进一步解析比大小字段所指示的方式来浪费时间.

import Control.Proxy.Attoparsec
import Control.Proxy.Trans.Either
import Data.Attoparsec as P
import Data.Attoparsec.Binary
import qualified Data.ByteString as BS

parser = do
    size <- fromIntegral <$> anyWord32le

    -- alternative 1 (ignore the Either for simplicity):
    Right result <- parseOnly blockParser <$> P.take size
    return result

    -- alternative 2
    (result, trackedSize) <- blockparser
    when (size /= trackedSize) $ fail "size mismatch"
    return result

blockParser = undefined

main = withBinaryFile "bin" ReadMode go where
    go h = fmap print . runProxy . runEitherK …
Run Code Online (Sandbox Code Playgroud)

parsing haskell attoparsec haskell-pipes

5
推荐指数
1
解决办法
759
查看次数

我可以从Control.Proxy生成StateP MonadState的实例吗?

我改变了一些代码,用来在内部运行的StateT单子到内运行StatePControl.Proxy.但是,我的一些代码(例如%=运算符来自Control.Lens)需要一个MonadState实例.我只是添加这样的实例是安全/正确的吗?这似乎是库最正确处理的东西(在本例中Control.Proxy).

monads haskell haskell-pipes

5
推荐指数
1
解决办法
147
查看次数

这是一个多用户网络服务器的理智架构吗?(管道并发引入了多少开销?)

我目前有它,所以有一个线程处理接受循环,一个主线程用于执行所有有状态逻辑的东西,然后每个连接客户端2个线程.一个客户端线程正在处理输入管道并使用管道并发将消息发送到主逻辑线程.另一个客户端线程正在处理输出管道,从主逻辑线程获取消息并将它们发送到客户端.

我这样做的理由是主逻辑线程可以使用工作线程对不可变状态进行纯计算,然后立即执行所有状态更改并使用新状态循环回来.这样我就可以使用多个CPU,而不必担心并发状态修改的问题.

STM /管道 - 并发的开销是否足够小,这是一种合理的方法,当我最终会有几千个连接的客户端每秒发送两到三条消息时?

haskell haskell-pipes

5
推荐指数
1
解决办法
182
查看次数

根据时间限制管道?

是否可以创建获取在特定时间段内向下游发送的所有值的管道?我正在实现一个服务器,协议允许我连接传出的数据包并将它们压缩在一起,所以我想ByteString每隔100ms 有效地"清空"下游的队列,mappend然后将它们放在一起,然后输出到下一个管道.做压缩.

haskell haskell-pipes

5
推荐指数
1
解决办法
258
查看次数

管道和非管道代码之间的并发考虑

我正在为管道接口中的某些编码包装C库,但我已经做了一些需要做出的设计决策.

在设置C库之后,我们保持编码器上下文.有了这个,我们可以编码,或者改变一些参数(让我们将Haskell接口调用到最后一个函数tune :: Context -> Int -> IO ()).我的问题分为两部分:

  1. 编码部分很容易包装成一个Pipe Foo Bar IO (),但我也想曝光tune.由于编码上下文的同时使用必须受到锁定保护,因此我需要在管道中的每次迭代时锁定,并tune使用相同的锁进行保护.但现在我觉得我正在强迫用户隐藏锁定.我在这里吠叫错了吗?这种情况通常如何在管道生态系统中得到解决?在我的情况下,我希望我的特定代码所属的管道始终在其自己的线程中运行,同时进行调整,但我不想强迫任何用户使用这种观点.管道生态系统中的其他软件包似乎也不会强迫用户使用.
  2. 不再使用的编码上下文需要正确地去初始化.在管道生态系统中,如何确保IO在管道被破坏时处理这些事情(在这种情况下执行som 动作)?

一个具体的例子是包装压缩库,在这种情况下,上面可以是:

  1. 压缩强度是可调的.我们设置了管道并快速运行.假设必须序列化对压缩编解码器上下文的并发访问,那么最好如何在管道保持运行时允许更改压缩强度设置?
  2. 压缩库在设置时从Haskell堆中分配了一堆内存,当管道被拆除时,我们需要调用一些库函数来清理它.

谢谢......这可能都很明显,但我对管道生态系统还很陌生.

编辑:发帖后阅读,我很确定这是我在这里问过的最模糊的问题.啊! 对不起;-)

haskell api-design haskell-pipes

5
推荐指数
1
解决办法
218
查看次数

Haskell Pipes - 获取管道中最后一个代理的返回值

假设我Proxy在 Haskell Pipes 中有两个。它们代表外部系统进程。

produce :: MonadIO m => Producer ByteString m ExitCode
consume :: MonadIO m => Consumer ByteString m ExitCode
Run Code Online (Sandbox Code Playgroud)

所以我把它们挂在一个Effect,像这样:

effect :: Effect m ExitCode
effect = produce >-> consume
Run Code Online (Sandbox Code Playgroud)

这将从终止的第一个Effect开始给我。通常,这将是,而不是。即使它没有首先终止,获得返回值的惯用 Pipes 方法是什么?ExitCodeProxyproduceconsumeconsume

到目前为止,我认为如果不进行某种令人讨厌的带内信令,这是不可能的,因此consume知道流已完成。最后一个代理知道关闭的唯一方法是从 中获取一些东西await,所以我可以向它发送一个空ByteString的信号来表示流已完成。只是感觉不太对劲。我现在拥有的是一个单独的 MVar,它可以提供退出值,但我认为必须有一种更惯用的方法来做到这一点。

haskell haskell-pipes

5
推荐指数
1
解决办法
235
查看次数

管道定义内部函数的原因

我正在查看管道库的源代码,例如在Core模块中,我不明白为什么作者到处都是使用定义函数的模式:

runEffect = go
  where
    go p = ...
Run Code Online (Sandbox Code Playgroud)

要么:

pull = go
  where
    go a' = ...
Run Code Online (Sandbox Code Playgroud)

要么:

reflect = go
  where
    go p = ...
Run Code Online (Sandbox Code Playgroud)

这是一些启用某些优化的技巧吗?我发现它很难看,如果它是一些优化技巧我真的希望编译器可以在没有这样的情况下做到这一点.但也许还有另一个原因?

haskell ghc haskell-pipes

5
推荐指数
1
解决办法
131
查看次数

Haskell 中字节流的高效流式传输和操作

在为大型(<bloblength><blob>)*编码二进制文件编写反序列化器时,我陷入了各种 Haskell 生产-转换-消费库的困境。到目前为止,我知道四个流媒体库:

这是一个简单的示例,说明当我尝试使用 进行Word32流式传输时,会出现问题conduit。一个稍微更实际的示例将首先读取Word32确定 blob 长度的 a,然后生成ByteString该长度的惰性(然后进一步反序列化)。但在这里我只是尝试以流方式从二进制文件中提取 Word32:

module Main where

-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary

import           Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get              as G
import qualified Data.ByteString              as BS
import qualified Data.ByteString.Char8        as C
import qualified Data.ByteString.Lazy         as BL
import           Data.Conduit
import qualified Data.Conduit.Binary          as CB
import qualified …
Run Code Online (Sandbox Code Playgroud)

streaming haskell bytestring haskell-pipes bytestream

5
推荐指数
1
解决办法
1075
查看次数

在Haskell,管道,attoparsec和容器中优化内存

我正在尝试进一步优化我的管道 - attoparsec解析器和存储,但无法获得更低的内存使用率.

给出account-parser.hs

{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoImplicitPrelude #-}

import Protolude hiding (for)

import Data.Hashable
import Data.IntMap.Strict (IntMap)
import Data.Vector (Vector)
import Pipes
import Pipes.Parse
import Pipes.Safe (MonadSafe, runSafeT)
import qualified Data.Attoparsec.ByteString.Char8 as AB
import qualified Data.IntMap.Strict as IM
import qualified Data.Vector as Vector
import qualified Pipes.Attoparsec as PA
import qualified Pipes.ByteString as PB
import qualified Pipes.Safe.Prelude as PSP

-- accountid|account-name|contractid|code

data AccountLine = AccountLine {
    _accountId         :: !ByteString,
    _accountName       :: !ByteString,
    _accountContractId …
Run Code Online (Sandbox Code Playgroud)

optimization haskell haskell-pipes

5
推荐指数
1
解决办法
133
查看次数

Haskell的FreeT和协程类型之间有什么关系

Monad.Reader第19期中的“协程管道”文章中,作者定义了一个泛型Coroutine类型:

newtype Coroutine f m a = Coroutine
  { resume :: m (Either (f (Coroutine f m a)) a)
  }
Run Code Online (Sandbox Code Playgroud)

我注意到这种类型FreeTfree包中的类型非常相似:

data FreeF f a b = Pure a | Free (f b)

newtype FreeT f m a = FreeT
  { runFreeT :: m (FreeF f a (FreeT f m a))
  }
Run Code Online (Sandbox Code Playgroud)

看来FreeTCoroutine是同构的。这是从一个映射到另一个的函数:

freeTToCoroutine
  :: forall f m a. (Functor f, Functor m) => FreeT f m a …
Run Code Online (Sandbox Code Playgroud)

haskell coroutine conduit free-monad haskell-pipes

5
推荐指数
1
解决办法
350
查看次数