imm*_*ate 6 memory reduce haskell stream haskell-pipes
假设我有一个这样的模块:
module Explosion where
import Pipes.Parse (foldAll, Parser, Producer)
import Pipes.ByteString (ByteString, fromLazy)
import Pipes.Aeson (DecodingError)
import Pipes.Aeson.Unchecked (decoded)
import Data.List (intercalate)
import Data.ByteString.Lazy.Char8 (pack)
import Lens.Family (view)
import Lens.Family.State.Strict (zoom)
produceString :: Producer ByteString IO ()
produceString = fromLazy $ pack $ intercalate " " $ map show [1..1000000]
produceInts ::
Producer Int IO (Either (DecodingError, Producer ByteString IO ()) ())
produceInts = view decoded produceString
produceInts' :: Producer Int IO ()
produceInts' = produceInts >> return ()
parseBiggest :: Parser ByteString IO Int
parseBiggest = zoom decoded (foldAll max 0 id)
Run Code Online (Sandbox Code Playgroud)
'produceString'函数是一个bytestring生成器,我关心的是在它上面折叠一个解析来产生某种结果.
以下两个程序通过将其解析为一系列JSON int来显示解决在bytestring中查找最大值的问题的不同方法.
计划1:
module Main where
import Explosion (produceInts')
import Pipes.Prelude (fold)
main :: IO ()
main = do
biggest <- fold max 0 id produceInts'
print $ show biggest
Run Code Online (Sandbox Code Playgroud)
计划2:
module Main where
import Explosion (parseBiggest, produceString)
import Pipes.Parse (evalStateT)
main :: IO ()
main = do
biggest <- evalStateT parseBiggest produceString
print $ show biggest
Run Code Online (Sandbox Code Playgroud)
不幸的是,当我对它们进行分析时,这两个程序总共消耗了大约200MB的内存,我希望使用流式解析器可以解决这个问题.第一个程序花费大部分时间和内存(> 70%)(^.)来自Lens.Family,而第二个程序花费它fmap,zoom来自Lens.Family.State.Strict.使用图如下.两个程序都花费大约70%的时间进行垃圾收集.
难道我做错了什么?Prelude功能max不够严格吗?我无法判断库函数是否错误,或者我是否使用了库错误!(可能是后者.)
为了完整性,这里有一个git repo,你可以克隆并运行cabal install,如果你想看到我正在谈论的第一手,这里是两个程序的内存使用情况:

在单个字符串中包含严格的字节字符串yield并不会使其变得懒惰.您必须产生较小的块才能获得任何流式传输行为.
编辑:我发现了错误. pipes-aeson在内部使用如下consecutively定义的函数:
consecutively parser = step where
step p0 = do
(mr, p1) <- lift $
S.runStateT atEndOfBytes (p0 >-> PB.dropWhile B.isSpaceWord8)
case mr of
Just r -> return (Right r)
Nothing -> do
(ea, p2) <- lift (S.runStateT parser p1)
case ea of
Left e -> return (Left (e, p2))
Right a -> yield a >> step p2
Run Code Online (Sandbox Code Playgroud)
有问题的一行是PB.dropWhile.这增加了与解析元素数量成比例的二次爆破.
发生的事情是,cat在每次解析之后,穿过此计算的管道会在其下游累积新管道.因此,在N解析后,您将获得N个cat管道,这会为每个已解析的元素添加O(N)开销.
我已经创建了一个Github问题来解决这个问题. pipes-aeson由Renzo维护,他之前已经解决了这个问题.
编辑:我已经提交了一个拉取请求来修复第二个问题(你需要使用intercalatefor lazy bytestrings).现在,该程序以两个版本的5 KB常量空间运行:

