Haskell快速并发队列

Woj*_*ilo 25 concurrency profiling haskell stm haskell-pipes

问题

你好!我正在编写一个日志库,我很乐意创建一个在单独的线程中运行的记录器,而所有应用程序线程都只是向它发送消息.我想为这个问题找到性能最佳的解决方案.我在这里需要简单的unboud队列.

途径

我已经创建了一些测试来查看可用解决方案的执行情况,我在这里得到了非常奇怪的结果.我测试了4个实现(下面提供的源代码)基于:

  1. 管道并发
  2. Control.Concurrent.Chan
  3. Control.Concurrent.Chan.Unagi
  4. 基于"Haskell中的并行和并发编程"一书中描述的MVar请注意,这种技术为我们提供了容量为1的有限队列 - 它仅用于测试

测试

以下是用于测试的源代码:

{-# LANGUAGE NoMonomorphismRestriction #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Pipes
import qualified Pipes.Concurrent as Pipes
import Control.Applicative
import Control.Monad (replicateM_)
import System.Environment (getArgs)

import Control.Concurrent.Chan
import Control.Concurrent (forkIO)
import qualified Control.Concurrent.Chan.Unagi as U
import Control.Concurrent.MVar
import Criterion.Main

data Event = Msg String | Status | Quit deriving (Show)

----------------------------------------------------------------------
-- Pipes
----------------------------------------------------------------------

pipesLogMsg = yield (Msg "hello")
pipesManyLogs num = replicateM_ num pipesLogMsg

pipesAddProducer num o = Pipes.forkIO $ do runEffect $ (pipesManyLogs num) >-> Pipes.toOutput o
                                           Pipes.performGC

pipesHandler max = loop 0
  where
    loop mnum = do
        if mnum == max
            then lift $ pure ()
            else do event <- await
                    case event of
                        Msg _  -> loop (mnum + 1)
                        Status -> (lift $ putStrLn (show mnum)) *> loop mnum
                        Quit   -> return ()

----------------------------------------------------------------------
-- Chan
----------------------------------------------------------------------

chanAddProducer num ch = forkIO $ chanManyLogs num ch
chanManyLogs num ch = replicateM_ num (writeChan ch (Msg "hello"))
chanHandler ch max = handlerIO (readChan ch) max

----------------------------------------------------------------------
-- Unagi-Chan
----------------------------------------------------------------------

uchanAddProducer num ch = forkIO $ uchanManyLogs num ch
uchanManyLogs num ch = replicateM_ num (U.writeChan ch (Msg "hello"))
uchanHandler ch max = handlerIO (U.readChan ch) max

----------------------------------------------------------------------
-- MVars
----------------------------------------------------------------------

mvarAddProducer num m = forkIO $ mvarManyLogs num m
mvarManyLogs num m = replicateM_ num (putMVar m (Msg "hello"))
mvarHandler m max = handlerIO (takeMVar m) max

----------------------------------------------------------------------
-- Utils
----------------------------------------------------------------------

handlerIO f max = loop 0 where
    loop mnum = do
        if mnum == max 
            then pure ()
            else do event <- f
                    case event of
                         Msg _  -> loop (mnum + 1)
                         Status -> putStrLn (show mnum) *> loop mnum
                         Quit   -> return ()

----------------------------------------------------------------------
-- Main
----------------------------------------------------------------------

main = defaultMain [
      bench "pipes" $ nfIO $ do
        (output, input) <- Pipes.spawn Pipes.Unbounded
        replicateM_ prodNum (pipesAddProducer msgNum output)
        runEffect $ Pipes.fromInput input >-> pipesHandler totalMsg
    , bench "Chan" $ nfIO $ do
        ch <- newChan
        replicateM_ prodNum (chanAddProducer msgNum ch)
        chanHandler ch totalMsg
    , bench "Unagi-Chan" $ nfIO $ do
        (inCh, outCh) <- U.newChan
        replicateM_ prodNum (uchanAddProducer msgNum inCh)
        uchanHandler outCh totalMsg
    , bench "MVar" $ nfIO $ do
        m <- newEmptyMVar
        replicateM_ prodNum (mvarAddProducer msgNum m)
        mvarHandler m totalMsg
    ]
  where
    prodNum  = 20
    msgNum   = 1000
    totalMsg = msgNum * prodNum
Run Code Online (Sandbox Code Playgroud)

你可以编译它,ghc -O2 Main.hs然后运行它.测试创建了20个消息生成器,每个生成器生成1000000条消息.

结果

benchmarking pipes
time                 46.68 ms   (46.19 ms .. 47.31 ms)
                     0.999 R²   (0.999 R² .. 1.000 R²)
mean                 47.59 ms   (47.20 ms .. 47.95 ms)
std dev              708.3 ?s   (558.4 ?s .. 906.1 ?s)

benchmarking Chan
time                 4.252 ms   (4.171 ms .. 4.351 ms)
                     0.995 R²   (0.991 R² .. 0.998 R²)
mean                 4.233 ms   (4.154 ms .. 4.314 ms)
std dev              244.8 ?s   (186.3 ?s .. 333.5 ?s)
variance introduced by outliers: 35% (moderately inflated)

benchmarking Unagi-Chan
time                 1.209 ms   (1.198 ms .. 1.224 ms)
                     0.996 R²   (0.993 R² .. 0.999 R²)
mean                 1.267 ms   (1.244 ms .. 1.308 ms)
std dev              102.4 ?s   (61.70 ?s .. 169.3 ?s)
variance introduced by outliers: 62% (severely inflated)

benchmarking MVar
time                 1.746 ms   (1.714 ms .. 1.774 ms)
                     0.997 R²   (0.995 R² .. 0.998 R²)
mean                 1.716 ms   (1.694 ms .. 1.739 ms)
std dev              73.99 ?s   (65.32 ?s .. 85.48 ?s)
variance introduced by outliers: 29% (moderately inflated)
Run Code Online (Sandbox Code Playgroud)

我很想问你为什么管道并发版本执行速度如此之慢以及为什么它比基于chan的速度慢得多.我很惊讶,MVar是所有版本中速度最快的 - 有人可以说更多,为什么我们得到这个结果,如果我们能在任何情况下做得更好?

jbe*_*man 17

所以我可以给你一些关于ChanTQueue(pipes-concurrency在这里内部使用)分析的一些概述,这些分析激发了一些设计决策unagi-chan.我不确定它是否会回答你的问题.我建议在进行基准测试时分叉不同的队列和玩变化,以便真正了解正在发生的事情.

Chan 好像:

data Chan a
 = Chan (MVar (Stream a)) -- pointer to "head", where we read from
        (MVar (Stream a)) -- pointer to "tail", where values written to

type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
Run Code Online (Sandbox Code Playgroud)

这是MVars 的链接列表.类型中的两个MVars分别Chan充当指向列表当前头部和尾部的指针.这就是写的样子:

writeChan :: Chan a -> a -> IO () 
writeChan (Chan _ writeVar) val = do 
    new_hole <- newEmptyMVar   mask_ $ do
    old_hole <- takeMVar writeVar           -- [1]
    putMVar old_hole (ChItem val new_hole)  -- [2]
    putMVar writeVar new_hole               -- [3]
Run Code Online (Sandbox Code Playgroud)

在1处,写入者在写入端锁定,在2处,我们的项目a可供读者使用,而在3处,写入端被解锁为其他编写者.

这实际上在单用户/单生产者场景中表现相当好(参见此处的图表),因为读取和写入不会竞争.但是,一旦你有多个并发编写器,你就会遇到麻烦:

  • 当另一位作家在2时击中1的作家将阻止并被取消安排(我能够测量上下文切换的最快速度是~150ns(相当快);可能会出现速度慢得多的情况.因此,当你很多作家争你基本上是通过调度犯了一个大来回,到一个等待队列中MVar,然后终于写就可以完成.

  • 当一个作者在2时被取消安排(因为它超时)时,它会保持锁定状态,并且不允许任何写入完成,直到它可以重新安排为止; 当我们过度订阅时,即当我们的线程/核心比率很高时,这就成了一个问题.

最后,使用MVar-per项目需要配置方面的一些开销,更重要的是,当我们积累很多可变对象,我们可能会导致大量的GC压力.

TQUEUE

TQueue非常好,因为STM它使得它的正确性变得非常简单.它是一个功能性的队列式队列,它write包括简单地读取编写器堆栈,使用我们的元素并将其写回:

data TQueue a = TQueue (TVar [a])
                       (TVar [a])

writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _ write) a = do  
  listend <- readTVar write   -- a transaction with a consistent 
  writeTVar write (a:listend) -- view of memory
Run Code Online (Sandbox Code Playgroud)

如果在writeTQueue将其新堆栈写回之后,另一个交错写入执行相同操作,则将重试其中一个写入.随着越来越多的writeTQueues交错,争用的影响变得更加严重.然而,性能降低的速度要慢得多,Chan因为只有一个writeTVar操作可以使竞争对象无效writeTQueue,并且事务处理非常小(只是读取和a (:)).

读取通过从写入端"出列"堆栈,将其反转,并将反向堆栈存储在其自己的变量中以便于"弹出"(总共这给我们分摊O(1)推送和弹出)来工作

readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
  xs <- readTVar read
  case xs of
    (x:xs') -> do writeTVar read xs'
                  return x
    [] -> do ys <- readTVar write
             case ys of
               [] -> retry
               _  -> case reverse ys of
                       [] -> error "readTQueue"
                       (z:zs) -> do writeTVar write []
                                    writeTVar read zs
                                    return z
Run Code Online (Sandbox Code Playgroud)

读者对作者有一个对称的温和争用问题.在一般情况下,读者和作者不会争辩,但是当读者堆栈耗尽时,读者会与其他读者和作者竞争.我怀疑如果您预先加载了TQueue足够的值,然后启动了4个读者和4个编写器,您可能会导致活锁,因为反向努力在下一次写入之前完成.值得注意的是,与其不同MVarTVar是,许多读者正在等待的写入会同时唤醒它们(这可能会或多或少地有效,具体取决于场景).

我怀疑你TQueue在测试中没有看到太多的弱点; 主要是你看到写入争用的中等影响以及许多可分配对象的大量分配和GC的开销.

鳗鱼瓒

unagi-chan首先是为了很好地处理争用而设计的.它在概念上非常简单,但实现有一些复杂性

data ChanEnd a = ChanEnd AtomicCounter (IORef (Int , Stream a))

data Stream a = Stream (Array (Cell a)) (IORef (Maybe (Stream a)))

data Cell a = Empty | Written a | Blocking (MVar a)
Run Code Online (Sandbox Code Playgroud)

读取和写入队列的两侧共享Stream它们协调传递值(从写入器到读取器)和阻塞指示(从读取器到写入器),并且读取和写入侧各自具有独立的原子计数器.写作如下:

  1. incrCounter编写器在写计数器上调用原子以接收与其(单个)读取器协调的唯一索引

  2. 作者找到它的单元格并执行CAS Written a

  3. 如果它成功退出,否则它会看到一个读者已经击败它并阻止(或继续阻止),所以它会(\Blocking v)-> putMVar v a)退出.

阅读以类似且明显的方式工作.

第一创新是使争用的点处的原子操作不处于争用降解(作为CAS /重试循环或陈状锁会).基于简单的基准测试和实验,atomic-primops库提供fetch-and-add primop效果最佳.

然后,在2两个读写器需要只进行一次比较并交换(读者快速路径是一个简单的非原子读)来完成协调.

所以,为了努力unagi-chan做好,我们

  • 使用fetch-and-add来处理争用点

  • 使用无锁技术,这样当我们被超额订阅时,在不合适的时间被调度的线程不会阻止其他线程的进度(被阻止的写入者最多可能阻塞由计数器"分配"给它的读者;阅读警告重新.异步异常在unagi-chandocs中,请注意Chan这里有更好的语义)

  • 使用数组来存储我们的元素,这些元素具有更好的局部性(但见下文),每个元素的开销更低,并且对GC的压力更小

最后一点注意事项.使用数组:并发写入数组通常是一个不好的扩展思路,因为你会导致很多缓存一致性流量,因为缓存行在你的编写器线程中来回无效.一般术语是"虚假分享".但是,我可以想到的替代设计也存在缓存方面的优势和缺点,这些设计可以划分写入或其他内容; 我一直在尝试这一点,但目前还没有任何结论.

我们合法地关注错误共享的一个地方是我们的计数器,我们对齐并填充到64个字节; 这确实出现在基准测试中,唯一的缺点是增加了内存使用量.

  • @PierreR:在这里!一个快速,可扩展的日志库供您使用!https://hackage.haskell.org/package/logger(或在github上:https://github.com/wdanilo/haskell-logger) (2认同)

Gab*_*lez 5

如果我猜的话,为什么pipes-concurrency执行更差,这是因为每一个读写是包裹在一个STM事务中,而其他库使用更有效的低级别的并发原语.

  • 那有什么理由,还是我们可以期待修复? (2认同)
  • 对于`pipes-concurrency`,API不太可能改变.该库强调易用性和正确性而非性能. (2认同)