Haskell批处理文件处理不会改善空间配置文件

ger*_*ben 8 profiling haskell heap-memory

我有一个简单的算法来实现:比较每一行与每一行.每行包含一个数字,比较函数是距离.所有距离的总和是最终结果.

这可以简单地实现如下:

sumOfDistancesOnSmallFile :: FilePath -> IO Integer
sumOfDistancesOnSmallFile path = withFile path ReadMode $ \h->do
                          distances <- liftM ( (map textRead) ) $ hListToEOF Text.hGetLine h
                          let subSet = drop offset distances
                          let emptySubSet = null subSet
                          return $ if (emptySubSet)
                                           then (0)
                                           else (distancesManyToMany subSet)

hListToEOF :: (Handle -> IO a) -> Handle -> IO [a]
hListToEOF func h = do
    element <- func h
    atEOF <- hIsEOF h
    rest <- case(atEOF) of
        True -> return []
        False -> hListToEOF func h
    return $ element:rest

distancesManyToMany :: [Integer]->Integer
distancesManyToMany (x:xs) = distancesOneToMany x xs + (distancesManyToMany xs)
distancesManyToMany _ = 0

distancesOneToMany :: Integer -> [Integer] -> Integer
distancesOneToMany one many = sum $ map (distance one) many

distance :: Integer -> Integer -> Integer
distance a b = (a-b)
Run Code Online (Sandbox Code Playgroud)

为了在每一行上获得合理的大数据,我使用了以下文件生成器:

createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
    where infiniteList :: Integer->Integer-> [Integer]
          infiniteList i j = (i+j) * (i+j) : infiniteList j (i+j)
Run Code Online (Sandbox Code Playgroud)

一个2000行文件,840kb将需要1.92秒和1.5Gb分配,最大使用量约为1.5Mb.

7.5k的6k行文件需要22秒,34Gb分配,最大内存使用量约为15Mb

不幸的是,我的数据将是数百万行.我最初试图提高速度(我之前在2篇关于MapReduceIteratee IO的帖子中提到),但实际的限制问题是空间.

中级思想:这可以通过阅读每个要比较的数字的完整文件来克服.这确实需要花费很多额外的时间,因为需要为要与文件其余部分进行比较的每一行打开和解析文件.此外,内存分配的数量将变为二次方.所以这并不是最终的解决方案

最后一步:这是我实现目标的第一步:批量执行.我想把几条k线带入内存.将ManyToMany算法应用于内存中的算法.然后,遍历文件的其余部分.在每个迭代步骤中,只需要读取和解析一个连续的行,然后可以将其与存储器批处理中的所有项进行比较.

通过选择足够大的批量大小,不必经常重新读取文件.我的实现如下:

sumOfDistancesOnBigFileUsingBatches :: FilePath -> Int -> Int -> IO Integer
sumOfDistancesOnBigFileUsingBatches path batchSize offset = do
                      (firstResult, maybeRecurse) <- singleResultBatch path batchSize offset
                      recursiveResult <- case maybeRecurse of
                                             Nothing -> return 0
                                             Just newOffset -> sumOfDistancesOnBigFileUsingBatches path batchSize newOffset
                      return $ firstResult + recursiveResult

singleResultBatch :: FilePath -> Int -> Int -> IO(Integer, Maybe Int)
singleResultBatch path batchSize offset = withFile path ReadMode $ \h->do
                          distances <- readDistances h
                          let (batch, subSet) = splitAt batchSize $ drop offset distances
                          let batchInner = distancesManyToMany batch
                          let recursionTerminated = null subSet
                          let (batchToSubSet, newOffset) = if (recursionTerminated)
                                              then (0, Nothing)
                                              else (distancesSetToSet batch subSet, Just (offset+batchSize))
                          return (batchInner+batchToSubSet, newOffset)
                          where
                            readDistances h = liftM ( (map textRead) ) $ hListToEOF Text.hGetLine h


distancesSetToSet :: [Integer] -> [Integer] -> Integer
distancesSetToSet xs ys = sum $ map (\one->distancesOneToMany one xs) ys
Run Code Online (Sandbox Code Playgroud)

在2k行文件中,批处理大小为500,它完成了2.16secs,2.2Gb分配和大约6Mb所需空间.这是最简单版本的4倍!这可能是巧合,但也有4批次使用......

让我感到惊讶的是,所有必需的空间最初被消耗,后来所需的空间只会减少.这会成为50k行文件(500MB)的问题,因为它会耗尽内存.

我的问题是:为什么批次解决方案会消耗更多内存?它似乎将整个文件保存在每个批处理的内存中,即使它应该(至少是我的意图)只在内存中保留一个批处理.

编辑:我删除了6k行文件和500行批处理的详细信息(我采用了错误的配置文件)

另外,这里是使用2k行文件和500行批处理生成的空间配置文件: 2k行文件批量解决方案的空间配置文件(840KB)

EDIT2:使用保留器进行分析导致:

total time  =        2.24 secs   (112 ticks @ 20 ms)
total alloc = 2,126,803,896 bytes  (excludes profiling overheads)

COST CENTRE                    MODULE               %time %alloc

textRead                       MapReduceTestStrictStrings  47.3   44.4
distance                       MapReduceTestStrictStrings  25.9   25.3
distancesOneToMany             MapReduceTestStrictStrings  18.8   29.5
singleResultBatch              MapReduceTestStrictStrings   4.5    0.0
readTextDevice                 Data.Text.IO.Internal   2.7    0.0


                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc

MAIN                     MAIN                                                   1           0   0.0    0.0   100.0  100.0
 main                    Main                                                1604           2   0.0    0.0   100.0  100.0
  sumOfDistancesOnBigFileUsingBatches MapReduceTestStrictStrings                          1605           4   0.0    0.0   100.0  100.0
   singleResultBatch     MapReduceTestStrictStrings                          1606          20   4.5    0.0   100.0  100.0
    distancesSetToSet    MapReduceTestStrictStrings                          1615           3   0.0    0.0    34.8   43.3
     distancesOneToMany  MapReduceTestStrictStrings                          1616        3000  14.3   23.2    34.8   43.2
      distance           MapReduceTestStrictStrings                          1617     1500000  20.5   20.0    20.5   20.0
    textRead             MapReduceTestStrictStrings                          1614        5000  47.3   44.4    47.3   44.4
    distancesManyToMany  MapReduceTestStrictStrings                          1611        2004   0.0    0.0     9.8   11.7
     distancesOneToMany  MapReduceTestStrictStrings                          1612        2000   4.5    6.3     9.8   11.6
      distance           MapReduceTestStrictStrings                          1613      499000   5.4    5.3     5.4    5.3
    hListToEOF           MapReduceTestStrictStrings                          1609       23996   0.9    0.6     3.6    0.6
     readTextDevice      Data.Text.IO.Internal                               1610        1660   2.7    0.0     2.7    0.0
 CAF:main4               Main                                                1591           1   0.0    0.0     0.0    0.0
 CAF:main5               Main                                                1590           1   0.0    0.0     0.0    0.0
  main                   Main                                                1608           0   0.0    0.0     0.0    0.0
 CAF                     GHC.Num                                             1580           1   0.0    0.0     0.0    0.0
 CAF                     GHC.IO.Handle.FD                                    1526           2   0.0    0.0     0.0    0.0
 CAF                     GHC.IO.FD                                           1510           2   0.0    0.0     0.0    0.0
 CAF                     System.Event.Thread                                 1508           3   0.0    0.0     0.0    0.0
 CAF                     GHC.IO.Encoding.Iconv                               1487           2   0.0    0.0     0.0    0.0
 CAF                     System.Event.Internal                               1486           2   0.0    0.0     0.0    0.0
 CAF                     System.Event.Unique                                 1483           1   0.0    0.0     0.0    0.0
 CAF                     GHC.Conc.Signal                                     1480           1   0.0    0.0     0.0    0.0
 CAF                     Data.Text.Internal                                   813           1   0.0    0.0     0.0    0.0
 CAF                     Data.Text.Array                                      811           1   0.0    0.0     0.0    0.0

Retainer sets created during profiling:
SET 2 = {<MAIN.SYSTEM>}
SET 3 = {<MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 15 = {<GHC.IO.FD.CAF>}
SET 17 = {<System.Event.Thread.CAF>}
SET 18 = {<>}
SET 44 = {<GHC.IO.Handle.FD.CAF>}
SET 47 = {<GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>}
SET 56 = {<GHC.Conc.Signal.CAF>}
SET 57 = {<>, <MAIN.SYSTEM>}
SET 66 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 67 = {<System.Event.Thread.CAF>, <>, <MAIN.SYSTEM>}
SET 72 = {<GHC.Conc.Sync.CAF>, <MAIN.SYSTEM>}
SET 76 = {<MapReduceTestStrictStrings.hListToEOF,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 81 = {<GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 83 = {<GHC.IO.Encoding.Iconv.CAF>, <GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 86 = {<GHC.Conc.Signal.CAF>, <>}
SET 95 = {<MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesManyToMany,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 96 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesManyToMany,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 100 = {<MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>, <MapReduceTestStrictStrings.hListToEOF,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 102 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesManyToMany,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 103 = {<MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 136 = {<GHC.IO.Handle.FD.CAF>, <MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 143 = {<GHC.Conc.Sync.CAF>, <GHC.IO.Handle.FD.CAF>, <MAIN.SYSTEM>}
SET 144 = {<MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 145 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 146 = {<MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 147 = {<MAIN.SYSTEM>, <MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
SET 148 = {<MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>, <MapReduceTestStrictStrings.distancesOneToMany,MapReduceTestStrictStrings.distancesSetToSet,MapReduceTestStrictStrings.singleResultBatch,MapReduceTestStrictStrings.sumOfDistancesOnBigFileUsingBatches,Main.main>}
Run Code Online (Sandbox Code Playgroud)

以下.hp图片: 2k行文件和500行批次的保留堆配置文件

编辑3: 前面的代码都使用了包:

Data.Text.IO
Data.Text
Data.Text.Read
Run Code Online (Sandbox Code Playgroud)

当我使用它们的懒惰版本时,总时间/内存/空间使用率并没有真正改变:2.62秒,2.25Gb分配和5.5MB空间

接受的解决方案: 懒惰的版本不起作用,因为hListToEOF强制读取一个完整的文件(我期望:构造函数懒惰地工作).

解决方案是使用以下导入:

import qualified Data.ByteString.Char8 as Str
import qualified Data.Text.Lazy.IO as TextIO
import qualified Data.Text.Lazy as T 
import Data.Text.Lazy.Read 
Run Code Online (Sandbox Code Playgroud)

并在singleResultBatch函数中进行以下修改:

                            readDistances = liftM  ( (map textRead . T.lines)) $ TextIO.readFile path
Run Code Online (Sandbox Code Playgroud)

然后速度(2.72s)和内存分配(2.3GB)都不会改变,这是预期的.

堆配置文件(空间使用)确实提高了(1.8MB而不是5.5MB),如下所示: 使用Text.Lazy包时的堆配置文件

Don*_*art 3

您需要增量处理数据。目前,hListToEOF一次性读取所有数据,然后慢慢处理(因此,在读入所有内容时会出现初始内存峰值,然后在释放列表时缓慢减少)。

不要通过 执行您自己的 IO hListToEOF,而是延迟读取/流式传输文件(例如,使用readFileText.Lazy 库)并将您的处理函数映射到它们上。