Haskell 中字节流的高效流式传输和操作

mcm*_*yer 5 streaming haskell bytestring haskell-pipes bytestream

在为大型(<bloblength><blob>)*编码二进制文件编写反序列化器时,我陷入了各种 Haskell 生产-转换-消费库的困境。到目前为止,我知道四个流媒体库:

这是一个简单的示例,说明当我尝试使用 进行Word32流式传输时,会出现问题conduit。一个稍微更实际的示例将首先读取Word32确定 blob 长度的 a,然后生成ByteString该长度的惰性(然后进一步反序列化)。但在这里我只是尝试以流方式从二进制文件中提取 Word32:

module Main where

-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary

import           Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get              as G
import qualified Data.ByteString              as BS
import qualified Data.ByteString.Char8        as C
import qualified Data.ByteString.Lazy         as BL
import           Data.Conduit
import qualified Data.Conduit.Binary          as CB
import qualified Data.Conduit.List            as CL
import           Data.Word                    (Word32)
import           System.Environment           (getArgs)

-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
    G.runGet G.getWord32be $ BL.fromStrict bs

-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
    mbs <- await
    case mbs of
        Just bs -> do
            case C.null bs of
                False -> do
                    yield $ getWord32 bs
                    leftover $ BS.drop 4 bs
                    transform
                True -> return ()
        Nothing -> return ()

main :: IO ()
main = do
    filename <- fmap (!!0) getArgs  -- should check length getArgs
    result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
    print $ length result   -- is always 8188 for files larger than 32752 bytes
Run Code Online (Sandbox Code Playgroud)

程序的输出只是读取的 Word32 的数量。事实证明,流在读取第一个块(大约 32KiB)后终止。由于某种原因mbs是 never Nothing,所以我必须检查null bs当块被消耗时哪个停止流。显然,我的管道transform有故障。我看到解决方案有两种途径:

  1. 不想await转到 的第二个块ByteStream,那么是否有另一个函数可以提取下一个块?在我见过的示例中(例如 Conduit 101)这不是它的完成方式
  2. 这只是错误的设置方式transform

如何正确完成此操作?这是正确的方法吗?(性能确实很重要。)

更新:这是一个糟糕的方法,使用Systems.IO.Streams

module Main where

import           Data.Word                (Word32)
import           System.Environment       (getArgs)
import           System.IO                (IOMode (ReadMode), openFile)
import qualified System.IO.Streams        as S
import           System.IO.Streams.Binary (binaryInputStream)
import           System.IO.Streams.List   (outputToList)

main :: IO ()
main = do
    filename : _ <- getArgs
    h <- openFile filename ReadMode
    s <- S.handleToInputStream h
    i <- binaryInputStream s :: IO (S.InputStream Word32)
    r <- outputToList $ S.connect i
    print $ last r
Run Code Online (Sandbox Code Playgroud)

“Bad”意味着:对时间和空间要求很高,不处理 Decode 异常。

dup*_*ode 3

您面临的直接问题是由您的使用方式引起的leftover。该函数用于“提供单个剩余输入以供当前单子绑定中的下一个组件使用”,因此当您在bs循环之前给出它时,transform您实际上会丢弃字节串的其余部分(即什么是后bs)。

基于您的代码的正确解决方案将使用增量输入接口将您的/组合Data.Binary.Get替换为完全消耗每个块的内容。不过,更实用的方法是使用binary-conduit包,它以以下形式提供(其源代码很好地了解了“手动”实现的样子):yieldleftoverconduitGet

import           Data.Conduit.Serialization.Binary

-- etc.

transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be
Run Code Online (Sandbox Code Playgroud)

需要注意的是,如果字节总数不是 4 的倍数(即最后一个Word32不完整),这将引发解析错误。在不太可能的情况下,这不是您想要的,一种懒惰的方法是简单地\bs -> C.take (4 * truncate (C.length bs / 4)) bs在输入字节串上使用。