Mas*_*sse 5 io concurrency haskell heap-memory
我是Haskell的初学者,并认为这将是一个很好的锻炼.我有一个分配,我需要在线程A中读取文件,处理线程B_i中的文件行,然后在线程C中输出结果.
我已经实现了这一点,但其中一个要求是我们不能相信整个文件适合内存.我希望懒惰的IO和垃圾收集器能为我做到这一点,但唉,内存使用量不断上升和上升.
读者线程(A)读取文件,readFile然后用行号压缩并用Just包装.然后写入这些压缩的行Control.Concurrent.Chan.每个消费者线程B都有自己的渠道.
每个消费者在有数据时读取他们自己的频道,如果正则表达式匹配,则将其输出到包含在Maybe(由列表组成)中的各自的输出通道.
打印机检查每个B线程的输出通道.如果结果(行)都不是Nothing,则打印该行.因为在这一点上应该没有引用较旧的行,我认为垃圾收集器能够释放这些行,但是我似乎在这里错了.
.lhs文件位于:http://gitorious.org/hajautettujen-sovellusten-muodostamistekniikat/hajautettujen-sovellusten-muodostamistekniikat/blobs/master/mgrep.lhs
所以问题是,我如何限制内存使用,或允许垃圾收集器删除行.
根据要求提供的片段.希望缩进不会被严重破坏:)
data Global = Global {done :: MVar Bool, consumers :: Consumers}
type Done = Bool
type Linenum = Int
type Line = (Linenum, Maybe String)
type Output = MVar [Line]
type Input = Chan Line
type Consumers = MVar (M.Map ThreadId (Done, (Input, Output)))
type State a = ReaderT Global IO a
producer :: [Input] -> FilePath -> State ()
producer c p = do
liftIO $ Main.log "Starting producer"
d <- asks done
f <- liftIO $ readFile p
mapM_ (\l -> mapM_
(liftIO . flip writeChan l) c)
$ zip [1..] $ map Just $ lines f
liftIO $ modifyMVar_ d (return . not)
printer :: State ()
printer = do
liftIO $ Main.log "Starting printer"
c <- (fmap (map (snd . snd) . M.elems)
(asks consumers >>= liftIO . readMVar))
uniq' c
where head' :: Output -> IO Line
head' ch = fmap head (readMVar ch)
tail' = mapM_ (liftIO . flip modifyMVar_
(return . tail))
cont ch = tail' ch >> uniq' ch
printMsg ch = readMVar (head ch) >>=
liftIO . putStrLn . fromJust . snd . head
cempty :: [Output] -> IO Bool
cempty ch = fmap (any id)
(mapM (fmap ((==) 0 . length) . readMVar ) ch)
{- Return false unless none are Nothing -}
uniq :: [Output] -> IO Bool
uniq ch = fmap (any id . map (isNothing . snd))
(mapM (liftIO . head') ch)
uniq' :: [Output] -> State ()
uniq' ch = do
d <- consumersDone
e <- liftIO $ cempty ch
if not e
then do
u <- liftIO $ uniq ch
if u then cont ch else do
liftIO $ printMsg ch
cont ch
else unless d $ uniq' ch
Run Code Online (Sandbox Code Playgroud)
并发编程不提供任何已定义的执行顺序,除非您自己使用mvar等强制执行.因此,在任何消费者将其读取并传递之前,生产者线程可能会将所有/大部分线路粘贴在chan中.另一个符合要求的架构就是让线程A调用惰性读取文件并将结果粘贴到mvar中.然后每个消费者线程获取mvar,读取一行,然后在继续处理该行之前替换mvar.即使这样,如果输出线程无法跟上,那么存储在chan上的匹配行数可以任意增加.
你所拥有的是推送架构.要真正使其在恒定的空间中发挥作用,请考虑需求驱动.找到一种机制,使输出线程向处理线程发出信号,表明它们应该做某事,并且处理线程向读取器线程发出信号,表明它们应该做某事.
另一种方法是使用有限大小的chans - 因此读取器线程在处理器线程没有赶上时会阻塞,因此处理器线程在输出线程没有赶上时就会阻塞.
总的来说,这个问题实际上让我想起了Tim Bray的宽边基准,尽管要求有些不同.无论如何,它引发了关于实现多核grep的最佳方式的广泛讨论.最重要的是问题是IO绑定,并且您希望多个读取器线程超过mmapped文件.
请参阅此处了解更多信息:http://www.tbray.org/ongoing/When/200x/2007/09/20/Wide-Finder
| 归档时间: |
|
| 查看次数: |
254 次 |
| 最近记录: |