读取文件时限制内存使用量

Mas*_*sse 5 io concurrency haskell heap-memory

我是Haskell的初学者,并认为这将是一个很好的锻炼.我有一个分配,我需要在线程A中读取文件,处理线程B_i中的文件行,然后在线程C中输出结果.

我已经实现了这一点,但其中一个要求是我们不能相信整个文件适合内存.我希望懒惰的IO和垃圾收集器能为我做到这一点,但唉,内存使用量不断上升和上升.

读者线程(A)读取文件,readFile然后用行号压缩并用Just包装.然后写入这些压缩的行Control.Concurrent.Chan.每个消费者线程B都有自己的渠道.

每个消费者在有数据时读取他们自己的频道,如果正则表达式匹配,则将其输出到包含在Maybe(由列表组成)中的各自的输出通道.

打印机检查每个B线程的输出通道.如果结果(行)都不是Nothing,则打印该行.因为在这一点上应该没有引用较旧的行,我认为垃圾收集器能够释放这些行,但是我似乎在这里错了.

.lhs文件位于:http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs

所以问题是,我如何限制内存使用,或允许垃圾收集器删除行.

根据要求提供的片段.希望缩进不会被严重破坏:)

data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a


producer :: [Input] -> FilePath -> State ()
producer c p = do
  liftIO $ Main.log "Starting producer"
  d <- asks done
  f <- liftIO $ readFile p
  mapM_ (\l -> mapM_
    (liftIO . flip writeChan l) c)
    $ zip [1..] $ map Just $ lines f
  liftIO $ modifyMVar_ d (return . not)

printer :: State ()
printer = do
  liftIO $ Main.log "Starting printer"
  c <- (fmap (map (snd . snd) . M.elems)
    (asks consumers >>= liftIO . readMVar))
  uniq' c
  where head' :: Output -> IO Line
    head' ch = fmap head (readMVar ch)

    tail' = mapM_ (liftIO . flip modifyMVar_
        (return . tail))

    cont ch = tail' ch >> uniq' ch

    printMsg ch = readMVar (head ch) >>=
        liftIO . putStrLn . fromJust . snd . head

    cempty :: [Output] -> IO Bool
    cempty ch = fmap (any id)
        (mapM (fmap ((==) 0 . length) . readMVar ) ch)

    {- Return false unless none are Nothing -}
    uniq :: [Output] -> IO Bool
    uniq ch = fmap (any id . map (isNothing . snd))
        (mapM (liftIO . head') ch)

    uniq' :: [Output] -> State ()
    uniq' ch = do
      d <- consumersDone
      e <- liftIO $ cempty ch
      if not e
        then  do
          u <- liftIO $ uniq ch
          if u then cont ch else do
        liftIO $ printMsg ch
        cont ch
          else unless d $ uniq' ch
Run Code Online (Sandbox Code Playgroud)

scl*_*clv 6

并发编程不提供任何已定义的执行顺序,除非您自己使用mvar等强制执行.因此,在任何消费者将其读取并传递之前,生产者线程可能会将所有/大部分线路粘贴在chan中.另一个符合要求的架构就是让线程A调用惰性读取文件并将结果粘贴到mvar中.然后每个消费者线程获取mvar,读取一行,然后在继续处理该行之前替换mvar.即使这样,如果输出线程无法跟上,那么存储在chan上的匹配行数可以任意增加.

你所拥有的是推送架构.要真正使其在恒定的空间中发挥作用,请考虑需求驱动.找到一种机制,使输出线程向处理线程发出信号,表明它们应该做某事,并且处理线程向读取器线程发出信号,表明它们应该做某事.

另一种方法是使用有限大小的chans - 因此读取器线程在处理器线程没有赶上时会阻塞,因此处理器线程在输出线程没有赶上时就会阻塞.

总的来说,这个问题实际上让我想起了Tim Bray的宽边基准,尽管要求有些不同.无论如何,它引发了关于实现多核grep的最佳方式的广泛讨论.最重要的是问题是IO绑定,并且您希望多个读取器线程超过mmapped文件.

请参阅此处了解更多信息:http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder

  • BoundedChan正在使用这种类型的hackage. (4认同)