使用Cassava在内存中加载CSV

Ant*_*ton 6 memory csv haskell

我试图将CSV加载到内存中作为矢量与木薯的矢量.我的程序确实有效,但是对于50MB的csv文件使用了大量的内存,我不明白为什么.

我知道使用Data.Csv.Streaming应该对大文件更好,但我认为50MB仍然可以.我想这两个Data.Csv和Data.Csv.Streaming从GitHub的项目页面或多或少典型的例子,我也试图实现自己的解析器,输出矢量的矢量(我以我的attoparsec-CSV代码https://开头hackage.haskell.org/package/attoparsec-csv),所有这些解决方案都使用大约2000MB的内存!我确信我所做的事情有问题.这样做的正确方法是什么?

我的最终目标是将数据完全加载到内存中,以便以后进一步处理.例如,我可以将数据拆分为有趣的矩阵,并与使用Hmatrix的人一起工作.

以下是我尝试使用Cassava的2个程序:

1 /使用Data.Csv

import qualified Data.ByteString.Lazy as BL
import qualified Data.Vector as V
import Data.Csv
import Data.Foldable


main = do
   csv <- BL.readFile "train.csv"
   let Right res = decode HasHeader csv :: Either String (V.Vector(V.Vector(BL.ByteString)))
   print $ res V.! 0
Run Code Online (Sandbox Code Playgroud)

2 /使用Data.Csv.Streaming

{-# LANGUAGE BangPatterns #-}

import qualified Data.ByteString.Lazy as BL
import qualified Data.Vector as V
import Data.Csv.Streaming
import Data.Foldable


main = do
   csv <- BL.readFile "train.csv"
   let !a = decode HasHeader csv :: Records(V.Vector(BL.ByteString))
   let !res = V.fromList $ Data.Foldable.toList a
   print $ res V.! 0
Run Code Online (Sandbox Code Playgroud)

请注意,我没有给你我基于attoparsec-csv制作的程序,因为它与Vector而不是List几乎完全相同.该解决方案的内存使用率仍然很低.

有趣的是,在Data.Csv.Streaming解决方案中,如果我只是使用Data.Foldable.for_打印我的数据,那么一切都超快,内存使用量为2MB.这让我觉得我的问题与我构建Vector的方式有关.可能累积thunk而不是将原始数据堆叠成紧凑的数据结构.

谢谢您的帮助,

安托万

Mic*_*ael 6

Data.CSV和之间的差异Data.CSV.Streaming可能不是你所期望的.Data.Vector.Vector如您所见,第一个产生csv内容.我不确定为什么这个向量的构造应该占用这么多的空间 - 尽管当我反映出由此产生的指针向量指向矢量指向懒惰时,它开始不让我感到惊讶这里的bytestrings包含28203420个与lazy bytestrings不同的指针,每行371个,每个指向原始字节流的一小部分,通常为'0'.关注http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html这意味着原始字节流中的典型双字节序列 - 几乎所有字节都看起来像这样:",0"即.[44,48]-被替换由多个指针和构造的:单独的懒惰字节串含量使得每对的字节占用像11个字(在ChunkEmpty构造要懒惰字节串,加上材料其中J Tibell使在9个字严格字节串)...加上原始字节(减去表示逗号和空格的字节).在64位系统中,这是一个非常巨大的升级规模.

Data.CSV.Streaming并不是真的如此不同:基本上它构造了一个稍微装饰的列表而不是矢量,所以原则上它可以被懒惰地评估,并且在理想情况下,整个事情不需要在内存中实现,正如您所注意到的那样.在这样的背景下单子,但是,你会,这是不是"从IO提取列表" 相当保证产生混乱和无秩序.

如果要正确传输csv内容,则应使用其中一个流式库.(我没有建议把整个东西放到内存中,除了显而易见的一个,就是安排cassava将每一行读成一个很好的紧凑数据类型,而不是一个指向惰性字节串的指针向量;这里虽然我们有371个"字段").

所以这是你的程序使用cassava-streams,它使用cassava(真正的)增量接口,然​​后用于io-streams创建记录流:

  {-# LANGUAGE BangPatterns #-}

  import qualified Data.ByteString.Lazy as BL
  import qualified Data.Vector as V
  import Data.Foldable
  import System.IO.Streams (InputStream, OutputStream)
  import qualified System.IO.Streams as Streams
  import qualified System.IO.Streams.Csv as CSV
  import System.IO

  type StreamOfCSV = InputStream (V.Vector(BL.ByteString))

  main = withFile "train.csv" ReadMode $ \h -> do
     input          <- Streams.handleToInputStream h 
     raw_csv_stream <- CSV.decodeStream HasHeader input
     csv_stream     <- CSV.onlyValidRecords raw_csv_stream :: IO StreamOfCSV
     m <- Streams.read csv_stream
     print m
Run Code Online (Sandbox Code Playgroud)

这样就可以立即使用,而不是hello-world打印第一条记录.您可以在教程源代码中看到更多操作https://github.com/pjones/cassava-streams/blob/master/src/System/IO/Streams/Csv/Tutorial.hs其他流媒体库也有类似的库.如果你想要构成的数据结构(如矩阵)可以适合内存,你应该能够通过折叠线来构造它,Streams.fold如果你试图从每一行中提取的信息是,那么应该没有问题.在折叠操作消耗之前进行适当评估.如果你可以安排木薯输出带有未装箱字段的非递归数据结构,那么就可以为该类型编写一个Unbox实例,并将整个csv折叠成一个紧密包装的未装箱的矢量.在这种情况下,每行有371个不同的字段,因此我猜不是一个选项.

以下是该Data.CSV.Streaming计划的等效内容:

  main = withFile "train.csv" ReadMode $ \h -> do
    input          <- Streams.handleToInputStream h 
    raw_csv_stream <- CSV.decodeStream HasHeader input
    csv_stream     <- CSV.onlyValidRecords raw_csv_stream :: IO StreamOfCSV
    csvs <- Streams.toList csv_stream
    print (csvs !! 0)
Run Code Online (Sandbox Code Playgroud)

它有同样的麻烦,因为它Streams.toList在尝试找出第一个元素之前用来收集巨大的列表.

- 附录

这里值得的是一个pipes-csv变体,它只是将每个解析后的行压缩成一个未装箱的Ints 向量(这比查找更容易Doubles,这就是这个csv实际存储的内容,使用readIntbytestring包.)

import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B
import qualified Data.Vector as V
import qualified Data.Vector.Unboxed as U
import Data.Csv

import qualified Pipes.Prelude as P
import qualified Pipes.ByteString as Bytes
import Pipes
import qualified Pipes.Csv as Csv
import System.IO
import Control.Applicative

import qualified Control.Foldl as L

main = withFile "train.csv" ReadMode $ \h -> do
  let csvs :: Producer (V.Vector ByteString) IO ()
      csvs = Csv.decode HasHeader (Bytes.fromHandle h) >-> P.concat
      -- shamelessly reading integral part only, counting bad parses as 0
      simplify bs = case B.readInt bs of
        Nothing       -> 0
        Just (n, bs') -> n
      uvectors :: Producer (U.Vector Int) IO ()
      uvectors = csvs  >-> P.map (V.map simplify) >-> P.map (V.foldr U.cons U.empty)
  runEffect $ uvectors >-> P.print
Run Code Online (Sandbox Code Playgroud)

您可以使用foldl库中的折叠折叠行,或者通过更换最后一行来完成这样的操作

  let myfolds = liftA3 (,,) (L.generalize (L.index 13))   -- the thirteenth row, if it exists
                            (L.randomN 3)   -- three random rows
                            (L.generalize L.length) -- number of rows
  (thirteen,mvs,len) <- L.impurely P.foldM myfolds uvectors

  case mvs of 
    Nothing -> return ()
    Just vs -> print (vs :: V.Vector (U.Vector Int))
  print thirteen
  print len
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我正在收集第十三行,三条随机行和记录总数 - 任何数量的其他折叠都可以与这些相结合.特别是,我们也可以将所有行收集到一个巨大的向量中L.vector,考虑到这个csv文件的大小,这可能仍然是一个坏主意.下面我们回到起点,我们收集所有内容并打印完成的矢量矢量的第17行,即一种大矩阵.

  vec_vec <- L.impurely P.foldM  L.vector uvectors
  print $ (vec_vec :: V.Vector (U.Vector Int)) V.! 17
Run Code Online (Sandbox Code Playgroud)

这需要大量的内存,但并不会给我的小笔记本电脑带来特别的压力.