减少Haskell程序中的垃圾收集暂停时间

jam*_*her 125 performance garbage-collection haskell latency ghc

我们正在开发一个程序,它接收和转发"消息",同时保留这些消息的临时历史记录,以便它可以告诉您消息历史记录(如果请求).消息以数字方式标识,通常大小约为1千字节,我们需要保留数十万条这些消息.

我们希望优化此程序的延迟:发送和接收消息之间的时间必须低于10毫秒.

该程序是用Haskell编写的,并用GHC编译.但是,我们发现垃圾收集暂停对于我们的延迟要求来说太长了:在我们的实际程序中超过100毫秒.

以下程序是我们的应用程序的简化版本.它使用a Data.Map.Strict来存储消息.消息ByteString由a标识Int.以递增的数字顺序插入1,000,000条消息,并且不断删除最旧的消息以使历史记录最多保留200,000条消息.

module Main (main) where

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if 200000 < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main :: IO ()
main = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])
Run Code Online (Sandbox Code Playgroud)

我们编译并运行此程序使用:

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 7.10.3
$ ghc -O2 -optc-O3 Main.hs
$ ./Main +RTS -s
   3,116,460,096 bytes allocated in the heap
     385,101,600 bytes copied during GC
     235,234,800 bytes maximum residency (14 sample(s))
     124,137,808 bytes maximum slop
             600 MB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0      6558 colls,     0 par    0.238s   0.280s     0.0000s    0.0012s
  Gen  1        14 colls,     0 par    0.179s   0.250s     0.0179s    0.0515s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.652s  (  0.745s elapsed)
  GC      time    0.417s  (  0.530s elapsed)
  EXIT    time    0.010s  (  0.052s elapsed)
  Total   time    1.079s  (  1.326s elapsed)

  %GC     time      38.6%  (40.0% elapsed)

  Alloc rate    4,780,213,353 bytes per MUT second

  Productivity  61.4% of total user, 49.9% of total elapsed
Run Code Online (Sandbox Code Playgroud)

这里重要的指标是"最大暂停"0.0515s,或51毫秒.我们希望将其减少至少一个数量级.

实验表明,GC暂停的长度由历史记录中的消息数决定.这种关系大致是线性的,或者可能是超线性的.下表显示了这种关系.(您可以在此处查看我们的基准测试,以及此处的一些图表.)

msgs history length  max GC pause (ms)
===================  =================
12500                                3
25000                                6
50000                               13
100000                              30
200000                              56
400000                             104
800000                             199
1600000                            487
3200000                           1957
6400000                           5378
Run Code Online (Sandbox Code Playgroud)

我们已经尝试了其他几个变量来确定它们是否可以减少这种延迟,但这些变量都不会产生很大的影响.这些不重要的变量包括:优化(-O,-O2); RTS GC选项(-G,-H,-A,-c),芯(的数目-N),不同的数据结构(Data.Sequence),消息的大小,并且产生短暂的垃圾的量.压倒性的决定因素是历史中的消息数量.

我们的工作理论是暂停是消息数量的线性因为每个GC循环必须遍历所有工作的可访问内存并复制它,这显然是线性操作.

问题:

  • 这种线性时间理论是否正确?可以用这种简单的方式表达GC暂停的长度,还是现实更复杂?
  • 如果GC暂停在工作存储器中是线性的,有没有办法减少所涉及的常数因素?
  • 是否有增量GC的任何选项,或类似的东西?我们只能看到研究论文.我们非常愿意以更低的延迟交换吞吐量.
  • 有没有什么方法可以为更小的GC循环"分区"内存,而不是分成多个进程?

Sim*_*low 91

实际上你有一个51ms的暂停时间和超过200Mb的实时数据.我工作的系统具有更大的最大暂停时间,其中一半的实时数据.

您的假设是正确的,主要的GC暂停时间与实时数据的数量成正比,遗憾的是GHC没有办法解决这个问题.我们在过去尝试过增量GC,但这是一个研究项目,并没有达到将其折叠到已发布的GHC所需的成熟度.

我们希望将来有一件事可以帮助解决这个问题:紧凑型地区:https://phabricator.haskell.org/D1264.它是一种手动内存管理,您可以在其中压缩堆中的结构,并且GC不必遍历它.它最适合长寿命数据,但也许它足以用于您设置中的单个消息.我们的目标是在GHC 8.2.0中使用它.

如果您处于分布式设置并且具有某种类型的负载平衡器,则可以使用一些技巧来避免暂停命中,您基本上确保负载均衡器不会向即将发生的计算机发送请求做一个主要的GC,当然要确保机器仍然完成GC,即使它没有得到请求.

  • 嗨西蒙,非常感谢你的详细回复!这是个坏消息,但关闭会很好.我们目前正在向可变实施方向发展,这是唯一合适的替代方案.我们不理解的一些事情:(1)负载平衡方案涉及哪些技巧 - 它们是否涉及手动`performGC`?(2)为什么用`-c`压缩会表现得更差 - 我们认为因为它找不到很多可以留在原地的东西?(3)有关于契约的更多细节吗?这听起来很有意思,但不幸的是,未来我们有点太过考虑了. (13认同)
  • @mljrg,您可能对http://www.well-typed.com/blog/2019/10/nonmoving-gc-merge/ (2认同)

mgm*_*ier 9

我已经尝试使用ringbuffer方法IOVector作为底层数据结构的代码片段.在我的系统(GHC 7.10.3,相同的编译选项)上,这导致最大时间(您在OP中提到的度量)减少了约22%.

NB.我在这里做了两个假设:

  1. 一个可变数据结构是适合该问题的(我猜消息传递意味着IO无论如何)
  2. 你的messageId是连续的

使用一些额外的Int参数和算法(比如当messageId被重置为0或者minBound)时,应该直接确定某个消息是否仍在历史记录中并从环形缓冲区中的相应索引中检索它.

为了您的测试乐趣:

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import qualified Data.Map.Strict as Map

import qualified Data.Vector.Mutable as Vector

data Msg = Msg !Int !ByteString.ByteString

type Chan = Map.Map Int ByteString.ByteString

data Chan2 = Chan2
    { next          :: !Int
    , maxId         :: !Int
    , ringBuffer    :: !(Vector.IOVector ByteString.ByteString)
    }

chanSize :: Int
chanSize = 200000

message :: Int -> Msg
message n = Msg n (ByteString.replicate 1024 (fromIntegral n))


newChan2 :: IO Chan2
newChan2 = Chan2 0 0 <$> Vector.unsafeNew chanSize

pushMsg2 :: Chan2 -> Msg -> IO Chan2
pushMsg2 (Chan2 ix _ store) (Msg msgId msgContent) =
    let ix' = if ix == chanSize then 0 else ix + 1
    in Vector.unsafeWrite store ix' msgContent >> return (Chan2 ix' msgId store)

pushMsg :: Chan -> Msg -> IO Chan
pushMsg chan (Msg msgId msgContent) =
  Exception.evaluate $
    let
      inserted = Map.insert msgId msgContent chan
    in
      if chanSize < Map.size inserted
      then Map.deleteMin inserted
      else inserted

main, main1, main2 :: IO ()

main = main2

main1 = Monad.foldM_ pushMsg Map.empty (map message [1..1000000])

main2 = newChan2 >>= \c -> Monad.foldM_ pushMsg2 c (map message [1..1000000])
Run Code Online (Sandbox Code Playgroud)

  • @jamesfisher:我实际上遇到了类似的问题,但决定将内存管理放在Haskell端.解决方案确实是一个环形缓冲区,它将原始数据的逐字节副本保存在单个连续的内存块中,从而产生单个Haskell值.看看这个[RingBuffer.hs要点](https://gist.github.com/mgmeier/d0febcc79e79b25155ac18180057ea16).我根据您的示例代码对其进行了测试,并且速度约为关键指标的90%.您可以在方便时使用代码. (11认同)
  • 嗨!很好的答案.我怀疑这个原因只能获得22%的加速,因为GC仍然需要在每个索引处使用`IOVector`和(不可变的,GC'd)值.我们目前正在研究使用可变结构重新实现的选项.它可能类似于你的环形缓冲系统.但是我们完全将它移到Haskell内存空间之外来进行我们自己的手动内存管理. (2认同)

小智 8

我必须同意其他人 - 如果你有严格的实时约束,那么使用GC语言并不理想.

但是,您可以考虑尝试其他可用的数据结构而不仅仅是Data.Map.

我使用Data.Sequence重写了它并获得了一些有希望的改进:

msgs history length  max GC pause (ms)
===================  =================
12500                              0.7
25000                              1.4
50000                              2.8
100000                             5.4
200000                            10.9
400000                            21.8
800000                            46
1600000                           87
3200000                          175
6400000                          350
Run Code Online (Sandbox Code Playgroud)

即使您正在优化延迟,我也注意到其他指标也在改善.在200000的情况下,执行时间从1.5秒降至0.2秒,总内存使用量从600MB降至27MB.

我应该注意到我通过调整设计来欺骗:

  • 我删除了IntMsg,所以它不是在两个地方.
  • 而不是使用地图从Ints到ByteStringS,我用SequenceByteStringS,和而不是一个Int每封邮件,我认为它可以用一个来完成Int整个Sequence.假设消息无法重新排序,您可以使用单个偏移量将您想要的消息转换为它在队列中的位置.

(我添加了一个额外的功能getMsg来证明这一点.)

{-# LANGUAGE BangPatterns #-}

import qualified Control.Exception as Exception
import qualified Control.Monad as Monad
import qualified Data.ByteString as ByteString
import Data.Sequence as S

newtype Msg = Msg ByteString.ByteString

data Chan = Chan Int (Seq ByteString.ByteString)

message :: Int -> Msg
message n = Msg (ByteString.replicate 1024 (fromIntegral n))

maxSize :: Int
maxSize = 200000

pushMsg :: Chan -> Msg -> IO Chan
pushMsg (Chan !offset sq) (Msg msgContent) =
    Exception.evaluate $
        let newSize = 1 + S.length sq
            newSq = sq |> msgContent
        in
        if newSize <= maxSize
            then Chan offset newSq
            else
                case S.viewl newSq of
                    (_ :< newSq') -> Chan (offset+1) newSq'
                    S.EmptyL -> error "Can't happen"

getMsg :: Chan -> Int -> Maybe Msg
getMsg (Chan offset sq) i_ = getMsg' (i_ - offset)
    where
    getMsg' i
        | i < 0            = Nothing
        | i >= S.length sq = Nothing
        | otherwise        = Just (Msg (S.index sq i))

main :: IO ()
main = Monad.foldM_ pushMsg (Chan 0 S.empty) (map message [1..5 * maxSize])
Run Code Online (Sandbox Code Playgroud)

  • 嗨!感谢您的回答.你的结果肯定仍然表现出线性减速,但你从"Data.Sequence"得到这样的加速非常有趣 - 我们测试了它,发现它实际上比Data.Map更糟糕!我不确定区别是什么,所以我要调查...... (4认同)

She*_*rsh 8

正如其他答案中提到的,GHC 中的垃圾收集器遍历实时数据,这意味着您在内存中存储的长期数据越多,GC 暂停时间就越长。

全球健康中心 8.2

为了部分解决这个问题,GHC-8.2 中引入了一个称为紧凑区域的功能。它既是 GHC 运行时系统的一个特性,也是一个公开方便的接口来使用的库。紧凑区域功能允许将您的数据放入内存中的一个单独位置,并且 GC 在垃圾收集阶段不会遍历它。因此,如果您有一个大型结构想要保留在内存中,请考虑使用紧凑区域。然而,紧凑区域本身没有 迷你垃圾收集器,它更适用于仅附加数据结构,而不是像HashMap您还想删除内容的地方。虽然你可以克服这个问题。有关详细信息,请参阅以下博客文章:

全球健康中心 8.10

此外,自 GHC-8.10 以来,实现了一种新的低延迟增量垃圾收集器算法。这是一种替代 GC 算法,默认情况下未启用,但您可以根据需要选择加入。因此,您可以将默认 GC 切换到更新的 GC 以自动获取紧凑区域提供的功能,而无需手动包装和展开。然而,新的 GC 不是灵丹妙药,不能自动解决所有问题,它有其权衡。有关新 GC 的基准测试,请参阅以下 GitHub 存储库: