提高基于线路的管道性能的方法

mod*_*lar 6 io haskell conduit

我使用haskell进行基于行的数据处理,即可以应用的任务sed,awk以及类似的工具.作为一个简单的例子,让我们000从标准输入前面加上每一行.

我有三种替代方法来完成任务:

  1. 惰性IO与懒惰ByteStrings
  2. 基于行的管道.
  3. 基于块的管道,ByteString内部采用纯粹的严格处理.

example.hs:

{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}

import ClassyPrelude.Conduit
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as BL8
import qualified Data.Conduit.Binary as CB

main = do
  [arg] <- getArgs
  case arg of

    "lazy" -> BL8.getContents >>= BL8.putStr . BL8.unlines . map ("000" ++) . BL8.lines

    "lines" -> runConduitRes $ stdinC .| CB.lines .|
      mapC ("000" ++) .| mapC (`snoc` 10) .| stdoutC

    "chunks" -> runConduitRes $ stdinC .| lineChunksC .|
      mapC (B8.unlines . (map ("000" ++)) . B8.lines) .| stdoutC


lineChunksC :: Monad m => Conduit ByteString m ByteString
lineChunksC = await >>= maybe (return ()) go
  where
  go acc = if
    | Just (_, 10) <- unsnoc acc -> yield acc >> lineChunksC
    | otherwise -> await >>= maybe (yield acc) (go' . breakAfterEOL)
    where
    go' (this, next) = let acc' = acc ++ this in if null next then go acc' else yield acc' >> go next

breakAfterEOL :: ByteString -> (ByteString, ByteString)
breakAfterEOL = uncurry (\x -> maybe (x, "") (first (snoc x)) . uncons) . break (== 10)
Run Code Online (Sandbox Code Playgroud)
$ stack ghc --package={classy-prelude-conduit,conduit-extra} -- -O2 example.hs -o example
$ for cmd in lazy lines chunks; do echo $cmd; time -p seq 10000000 | ./example $cmd > /dev/null; echo; done
lazy
real 2.99
user 3.06
sys 0.07

lines
real 3.30
user 3.36
sys 0.06

chunks
real 1.83
user 1.95
sys 0.06

(结果在多次运行中是一致的,并且也适用于具有多个数字的行).

因此chunks速度比lines快一点快1.6倍lazy.这意味着管道可以比普通的字节串更快,但是当您将块分成短线时,管道管道的开销太大.

我不喜欢的chunks方法是它混合了管道和纯净的世界,并且使得它更难以用于更复杂的任务.

问题是,我是否错过了一个简单而优雅的解决方案,它允许我以与lines方法相同的方式编写高效的代码?

EDIT1:每@迈克尔的建议我在一起的两个mapC成一个mapC (("000" ++). (snoc 10))lines解决方案,使管道的数量(.|)之间的相同lineschunks.这使它表现得更好(从3.3秒降至2.8秒),但仍然明显慢于chunks.

我也尝试过Conduit.Binary.linesMichael在评论中建议的旧版本,它也提高了性能,大约0.1秒.

EDIT2:修正了lineChunksC它适用于非常小的块,例如

> runConduitPure $ yield ("\nr\n\n"::ByteString) .| concatC .| mapC singleton .| lineChunksC .| sinkList 
["\n","r\n","\n"]
Run Code Online (Sandbox Code Playgroud)

dan*_*iaz 3

我的猜测是,对于“线条”,该mapC ("000" ++) .| mapC (`snoc` 10)部分做了很多工作。

将几个严格连接ByteStrings到另一个严格ByteString是昂贵的。将它们连接成惰性ByteString往往会更有效。

为了避免这种成本,您可以将每个部分作为严格的下游单独生产ByteString(但请注意,我们不再谈论“线”)。

或者,将每个转换后的行作为惰性ByteString下游生成。


问题是,我是否错过了一个简单而优雅的解决方案,它允许我以与行方法相同的方式编写高效的代码?

一些流媒体库有一个有趣的功能:您可以在流中界定行并对其进行操作,而无需在任何时候在内存中具体化整行。

这里我使用streamingstreaming-bytestring包,因为我对它们更熟悉。

Data.ByteString.Streaming.Char8在streaming -bytestring模块中,我们有以下lines功能:

lines :: Monad m => ByteString m r -> Stream (ByteString m) m r
Run Code Online (Sandbox Code Playgroud)

lines 将 ByteString 转换为以换行符分隔的连接的 ByteString 流。生成的字符串不包含换行符。这是真正的流式传输线,它只会破坏块,因此不会增加内存的使用。

其要点是它已经ByteString m r是流类型了!所以这个版本将流转换为“流的流”。而我们只能通过耗尽“当前流”(当前行)才能到达“下一个流”(下一行)。lines

您的“线条”示例可以写为:

{-# language OverloadedStrings #-}
module Main where

import Control.Applicative ((*>))
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString.Streaming.Char8 as Q

main :: IO ()
main = Q.stdout
     . Q.unlines
     . S.maps (\line -> "000" *> line)
     . Q.lines
     $ Q.stdin
Run Code Online (Sandbox Code Playgroud)