mod*_*lar 6 io haskell conduit
我使用haskell进行基于行的数据处理,即可以应用的任务sed,awk以及类似的工具.作为一个简单的例子,让我们000从标准输入前面加上每一行.
我有三种替代方法来完成任务:
ByteStringsByteString内部采用纯粹的严格处理.example.hs:
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
import ClassyPrelude.Conduit
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as BL8
import qualified Data.Conduit.Binary as CB
main = do
[arg] <- getArgs
case arg of
"lazy" -> BL8.getContents >>= BL8.putStr . BL8.unlines . map ("000" ++) . BL8.lines
"lines" -> runConduitRes $ stdinC .| CB.lines .|
mapC ("000" ++) .| mapC (`snoc` 10) .| stdoutC
"chunks" -> runConduitRes $ stdinC .| lineChunksC .|
mapC (B8.unlines . (map ("000" ++)) . B8.lines) .| stdoutC
lineChunksC :: Monad m => Conduit ByteString m ByteString
lineChunksC = await >>= maybe (return ()) go
where
go acc = if
| Just (_, 10) <- unsnoc acc -> yield acc >> lineChunksC
| otherwise -> await >>= maybe (yield acc) (go' . breakAfterEOL)
where
go' (this, next) = let acc' = acc ++ this in if null next then go acc' else yield acc' >> go next
breakAfterEOL :: ByteString -> (ByteString, ByteString)
breakAfterEOL = uncurry (\x -> maybe (x, "") (first (snoc x)) . uncons) . break (== 10)
Run Code Online (Sandbox Code Playgroud)
$ stack ghc --package={classy-prelude-conduit,conduit-extra} -- -O2 example.hs -o example
$ for cmd in lazy lines chunks; do echo $cmd; time -p seq 10000000 | ./example $cmd > /dev/null; echo; done
lazy
real 2.99
user 3.06
sys 0.07
lines
real 3.30
user 3.36
sys 0.06
chunks
real 1.83
user 1.95
sys 0.06
(结果在多次运行中是一致的,并且也适用于具有多个数字的行).
因此chunks速度比lines快一点快1.6倍lazy.这意味着管道可以比普通的字节串更快,但是当您将块分成短线时,管道管道的开销太大.
我不喜欢的chunks方法是它混合了管道和纯净的世界,并且使得它更难以用于更复杂的任务.
问题是,我是否错过了一个简单而优雅的解决方案,它允许我以与lines方法相同的方式编写高效的代码?
EDIT1:每@迈克尔的建议我在一起的两个mapC成一个mapC (("000" ++). (snoc 10))的lines解决方案,使管道的数量(.|)之间的相同lines和chunks.这使它表现得更好(从3.3秒降至2.8秒),但仍然明显慢于chunks.
我也尝试过Conduit.Binary.linesMichael在评论中建议的旧版本,它也提高了性能,大约0.1秒.
EDIT2:修正了lineChunksC它适用于非常小的块,例如
> runConduitPure $ yield ("\nr\n\n"::ByteString) .| concatC .| mapC singleton .| lineChunksC .| sinkList
["\n","r\n","\n"]
Run Code Online (Sandbox Code Playgroud)
我的猜测是,对于“线条”,该mapC ("000" ++) .| mapC (`snoc` 10)部分做了很多工作。
将几个严格连接ByteStrings到另一个严格ByteString是昂贵的。将它们连接成惰性ByteString往往会更有效。
为了避免这种成本,您可以将每个部分作为严格的下游单独生产ByteString(但请注意,我们不再谈论“线”)。
或者,将每个转换后的行作为惰性ByteString下游生成。
问题是,我是否错过了一个简单而优雅的解决方案,它允许我以与行方法相同的方式编写高效的代码?
一些流媒体库有一个有趣的功能:您可以在流中界定行并对其进行操作,而无需在任何时候在内存中具体化整行。
这里我使用streaming和streaming-bytestring包,因为我对它们更熟悉。
Data.ByteString.Streaming.Char8在streaming -bytestring模块中,我们有以下lines功能:
lines :: Monad m => ByteString m r -> Stream (ByteString m) m r
Run Code Online (Sandbox Code Playgroud)
lines 将 ByteString 转换为以换行符分隔的连接的 ByteString 流。生成的字符串不包含换行符。这是真正的流式传输线,它只会破坏块,因此不会增加内存的使用。
其要点是它已经ByteString m r是流类型了!所以这个版本将流转换为“流的流”。而我们只能通过耗尽“当前流”(当前行)才能到达“下一个流”(下一行)。lines
您的“线条”示例可以写为:
{-# language OverloadedStrings #-}
module Main where
import Control.Applicative ((*>))
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString.Streaming.Char8 as Q
main :: IO ()
main = Q.stdout
. Q.unlines
. S.maps (\line -> "000" *> line)
. Q.lines
$ Q.stdin
Run Code Online (Sandbox Code Playgroud)