mod*_*lar 6 io haskell conduit
我使用haskell进行基于行的数据处理,即可以应用的任务sed
,awk
以及类似的工具.作为一个简单的例子,让我们000
从标准输入前面加上每一行.
我有三种替代方法来完成任务:
ByteString
sByteString
内部采用纯粹的严格处理.example.hs
:
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
import ClassyPrelude.Conduit
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy.Char8 as BL8
import qualified Data.Conduit.Binary as CB
main = do
[arg] <- getArgs
case arg of
"lazy" -> BL8.getContents >>= BL8.putStr . BL8.unlines . map ("000" ++) . BL8.lines
"lines" -> runConduitRes $ stdinC .| CB.lines .|
mapC ("000" ++) .| mapC (`snoc` 10) .| stdoutC
"chunks" -> runConduitRes $ stdinC .| lineChunksC .|
mapC (B8.unlines . (map ("000" ++)) . B8.lines) .| stdoutC
lineChunksC :: Monad m => Conduit ByteString m ByteString
lineChunksC = await >>= maybe (return ()) go
where
go acc = if
| Just (_, 10) <- unsnoc acc -> yield acc >> lineChunksC
| otherwise -> await >>= maybe (yield acc) (go' . breakAfterEOL)
where
go' (this, next) = let acc' = acc ++ this in if null next then go acc' else yield acc' >> go next
breakAfterEOL :: ByteString -> (ByteString, ByteString)
breakAfterEOL = uncurry (\x -> maybe (x, "") (first (snoc x)) . uncons) . break (== 10)
Run Code Online (Sandbox Code Playgroud)
$ stack ghc --package={classy-prelude-conduit,conduit-extra} -- -O2 example.hs -o example $ for cmd in lazy lines chunks; do echo $cmd; time -p seq 10000000 | ./example $cmd > /dev/null; echo; done lazy real 2.99 user 3.06 sys 0.07 lines real 3.30 user 3.36 sys 0.06 chunks real 1.83 user 1.95 sys 0.06
(结果在多次运行中是一致的,并且也适用于具有多个数字的行).
因此chunks
速度比lines
快一点快1.6倍lazy
.这意味着管道可以比普通的字节串更快,但是当您将块分成短线时,管道管道的开销太大.
我不喜欢的chunks
方法是它混合了管道和纯净的世界,并且使得它更难以用于更复杂的任务.
问题是,我是否错过了一个简单而优雅的解决方案,它允许我以与lines
方法相同的方式编写高效的代码?
EDIT1:每@迈克尔的建议我在一起的两个mapC
成一个mapC (("000" ++). (
snoc 10))
的lines
解决方案,使管道的数量(.|
)之间的相同lines
和chunks
.这使它表现得更好(从3.3秒降至2.8秒),但仍然明显慢于chunks
.
我也尝试过Conduit.Binary.lines
Michael在评论中建议的旧版本,它也提高了性能,大约0.1秒.
EDIT2:修正了lineChunksC
它适用于非常小的块,例如
> runConduitPure $ yield ("\nr\n\n"::ByteString) .| concatC .| mapC singleton .| lineChunksC .| sinkList
["\n","r\n","\n"]
Run Code Online (Sandbox Code Playgroud)
我的猜测是,对于“线条”,该mapC ("000" ++) .| mapC (`snoc` 10)
部分做了很多工作。
将几个严格连接ByteStrings
到另一个严格ByteString
是昂贵的。将它们连接成惰性ByteString
往往会更有效。
为了避免这种成本,您可以将每个部分作为严格的下游单独生产ByteString
(但请注意,我们不再谈论“线”)。
或者,将每个转换后的行作为惰性ByteString
下游生成。
问题是,我是否错过了一个简单而优雅的解决方案,它允许我以与行方法相同的方式编写高效的代码?
一些流媒体库有一个有趣的功能:您可以在流中界定行并对其进行操作,而无需在任何时候在内存中具体化整行。
这里我使用streaming和streaming-bytestring包,因为我对它们更熟悉。
Data.ByteString.Streaming.Char8
在streaming -bytestring模块中,我们有以下lines
功能:
lines :: Monad m => ByteString m r -> Stream (ByteString m) m r
Run Code Online (Sandbox Code Playgroud)
lines 将 ByteString 转换为以换行符分隔的连接的 ByteString 流。生成的字符串不包含换行符。这是真正的流式传输线,它只会破坏块,因此不会增加内存的使用。
其要点是它已经ByteString m r
是流类型了!所以这个版本将流转换为“流的流”。而我们只能通过耗尽“当前流”(当前行)才能到达“下一个流”(下一行)。lines
您的“线条”示例可以写为:
{-# language OverloadedStrings #-}
module Main where
import Control.Applicative ((*>))
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString.Streaming.Char8 as Q
main :: IO ()
main = Q.stdout
. Q.unlines
. S.maps (\line -> "000" *> line)
. Q.lines
$ Q.stdin
Run Code Online (Sandbox Code Playgroud)