Ell*_*ron 6 queue multithreading haskell ghc
我有一个相对简单的"复制"程序,它只是将一个文件的所有行复制到另一个文件.我玩弄Haskell的并发支持TMQueue
和STM
所以我想我会尝试这样的:
{-# LANGUAGE BangPatterns #-}
module Main where
import Control.Applicative
import Control.Concurrent.Async -- from async
import Control.Concurrent.Chan
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMQueue -- from stm-chans
import Control.Monad (replicateM, forM_, forever, unless)
import qualified Data.ByteString.Char8 as B
import Data.Function (fix)
import Data.Maybe (catMaybes, maybe)
import System.IO (withFile, IOMode(..), hPutStrLn, hGetLine)
import System.IO.Error (catchIOError)
input = "data.dat"
output = "out.dat"
batch = 100 :: Int
consumer :: TMQueue B.ByteString -> IO ()
consumer q = withFile output WriteMode $ \fh -> fix $ \loop -> do
!items <- catMaybes <$> replicateM batch readitem
forM_ items $ B.hPutStrLn fh
unless (length items < batch) loop
where
readitem = do
!item <- atomically $ readTMQueue q
return item
producer :: TMQueue B.ByteString -> IO ()
producer q = withFile input ReadMode $ \fh ->
(forever (B.hGetLine fh >>= atomically . writeTMQueue q))
`catchIOError` const (atomically (closeTMQueue q) >> putStrLn "Done")
main :: IO ()
main = do
q <- atomically newTMQueue
thread <- async $ consumer q
producer q
wait thread
Run Code Online (Sandbox Code Playgroud)
我可以像这样制作一个小测试输入文件
ghc -e 'writeFile "data.dat" (unlines (map show [1..5000000]))'
Run Code Online (Sandbox Code Playgroud)
并像这样构建它
ghc --make QueueTest.hs -O2 -prof -auto-all -caf-all -threaded -rtsopts -o q
Run Code Online (Sandbox Code Playgroud)
当我像这样运行它时./q +RTS -s -prof -hc -L60 -N2
,它说"2117 MB总内存在使用中"!但输入文件只有38 MB!
我是剖析的新手,但是我已经在图形之后生成了图形,并且无法查明我的错误.
正如OP指出的那样,现在我不妨写一个真正的答案。让我们从内存消耗开始。
两个有用的参考是Haskell 数据类型的内存占用和http://blog.johantibell.com/2011/06/memory-footprints-of-some-common-data.html。我们还需要查看一些结构的定义。
-- from http://hackage.haskell.org/package/stm-chans-3.0.0.2/docs/src/Control-Concurrent-STM-TMQueue.html
data TMQueue a = TMQueue
{-# UNPACK #-} !(TVar Bool)
{-# UNPACK #-} !(TQueue a)
deriving Typeable
-- from http://hackage.haskell.org/package/stm-2.4.3/docs/src/Control-Concurrent-STM-TQueue.html
-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
{-# UNPACK #-} !(TVar [a])
Run Code Online (Sandbox Code Playgroud)
该TQueue
实现使用具有读取端和写入端的标准功能队列。
让我们设置内存使用的上限,并假设我们在TMQueue
消费者执行任何操作之前将整个文件读入其中。在这种情况下,TQueue 的写入端将包含一个列表,每个输入行包含一个元素(存储为字节串)。每个列表节点看起来像
(:) bytestring tail
Run Code Online (Sandbox Code Playgroud)
需要 3 个单词(每个字段 1 个单词 + 构造函数 1 个单词)。每个字节串有 9 个字,因此将两者相加,每行有 12 个字的开销,不包括实际数据。您的测试数据为 500 万行,因此整个文件(加上一些常量)的开销为 6000 万字,在 64 位系统上约为 460MB(假设我的数学正确,总是有问题的)。添加 40MB 的实际数据,我们得到的值非常接近我在系统上看到的值。
那么,为什么我们的内存使用量接近这个上限呢?我有一个理论(调查作为练习!)。首先,生产者的运行速度可能比消费者快一点,因为读取通常比写入快(我使用的是旋转磁盘,也许 SSD 会有所不同)。这是 readTQueue 的定义:
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> case reverse ys of
[] -> error "readTQueue"
(z:zs) -> do writeTVar write []
writeTVar read zs
return z
Run Code Online (Sandbox Code Playgroud)
首先,我们尝试从读取端读取,如果该端为空,我们在反转该列表后尝试从写入端读取。
我认为发生的事情是这样的:当消费者需要从写端读取时,需要遍历STM事务内的输入列表。这需要一些时间,这将导致它与生产者竞争。随着生产者进一步前进,该列表变得更长,导致读取花费更多时间,在此期间生产者能够写入更多值,导致读取失败。这个过程会重复,直到生产者完成,然后消费者才有机会处理大量数据。这不仅会破坏并发性,还会增加更多的 CPU 开销,因为消费者事务不断重试并失败。
那么,鳗鱼呢?有几个关键的区别。首先,unagi-chan 在内部使用数组而不是列表。这会稍微减少开销。大部分开销来自 ByteString 指针,因此不多,但有一点。其次,unagi 保留数组块。即使我们悲观地假设生产者总是赢得竞争,在数组被填充后,它也会被从通道的生产者一侧推出。现在,生产者正在写入新数组,而消费者则从旧数组中读取。这种情况近乎理想;不存在共享资源争用,消费者具有良好的引用局部性,并且由于消费者正在处理不同的内存块,因此不存在缓存一致性问题。与我对 的理论描述不同TMQueue
,现在您将获得并发操作,允许生产者清除一些内存使用量,使其永远不会达到上限。
顺便说一句,我认为消费者分批没有好处。句柄已经由 IO 子系统缓冲了,所以我认为这不会带来任何好处。对我来说,当我将消费者更改为逐行操作时,性能略有提高。
现在,你能对这个问题做什么呢?根据我的工作假设(TMQueue
存在争用问题)以及您指定的要求,您只需要使用另一种类型的队列。显然鳗鱼效果很好。我也尝试过TMChan
,它比 unagi 慢约 25%,但使用的内存少 45%,所以这也是一个不错的选择。(这并不奇怪,TMChan
具有不同的结构,TMQueue
因此它会有不同的性能特征)
您还可以尝试更改算法,以便生产者发送多行块。这将降低所有字节字符串的内存开销。
那么,什么时候使用合适呢TMQueue
?如果生产者和消费者的速度差不多,或者消费者更快,应该没问题。此外,如果处理时间不均匀,或者生产者突发运行,您可能会获得良好的摊销性能。这几乎是最坏的情况,也许应该将其报告为针对stm
?的错误。我想如果读取功能改为
-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do writeTVar read xs'
return x
[] -> do ys <- readTVar write
case ys of
[] -> retry
_ -> do writeTVar write []
let (z:zs) = reverse ys
writeTVar read zs
return z
Run Code Online (Sandbox Code Playgroud)
这样就可以避免这个问题。现在z
和zs
绑定都应该被延迟评估,因此列表遍历将发生在该事务之外,从而允许读取操作有时在争用情况下成功。当然,假设我首先对这个问题的看法是正确的(并且这个定义足够懒惰)。但可能还有其他意想不到的缺点。