MailboxProcessor性能问题

thr*_*thr 9 .net f# functional-programming actor

我一直在尝试设计一个允许大量并发用户同时在内存中表示的系统.当我开始设计这个系统时,我立刻想到了某种基于actor的解决方案,这是Erlang的亲属.

系统必须在.NET中完成,所以我开始使用MailboxProcessor在F#中开发原型,但是遇到了严重的性能问题.我最初的想法是每个用户使用一个actor(MailboxProcessor)来为一个用户序列化通信通信.

我已经隔离了一小段代码,它重现了我所看到的问题:

open System.Threading;
open System.Diagnostics;

type Inc() =

    let mutable n = 0;
    let sw = new Stopwatch()

    member x.Start() =
        sw.Start()

    member x.Increment() =
        if Interlocked.Increment(&n) >= 100000 then
            printf "UpdateName Time %A" sw.ElapsedMilliseconds

type Message
    = UpdateName of int * string

type User = {
    Id : int
    Name : string
}

[<EntryPoint>]
let main argv = 

    let sw = Stopwatch.StartNew()
    let incr = new Inc()
    let mb = 

        Seq.initInfinite(fun id -> 
            MailboxProcessor<Message>.Start(fun inbox -> 

                let rec loop user =
                    async {
                        let! m = inbox.Receive()

                        match m with
                        | UpdateName(id, newName) ->
                            let user = {user with Name = newName};
                            incr.Increment()
                            do! loop user
                    }

                loop {Id = id; Name = sprintf "User%i" id}
            )
        ) 
        |> Seq.take 100000
        |> Array.ofSeq

    printf "Create Time %i\n" sw.ElapsedMilliseconds
    incr.Start()

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(UpdateName(i, sprintf "User%i-UpdateName" i));

    System.Console.ReadLine() |> ignore

    0
Run Code Online (Sandbox Code Playgroud)

在我的四核i7上创建100k演员需要大约800ms.然后将UpdateName消息提交给每个演员并等待他们完成大约需要1.8秒.

现在,我意识到所有队列都有开销:在ThreadPool上,在MailboxProcessor内部设置/重置AutoResetEvents等.但这真的是预期的表现吗?通过阅读MSDN和MailboxProcessor上的各种博客,我已经认识到它将成为erlang演员的亲戚,但从我看到的深渊表现来看,这在现实中似乎并不成立?

我还尝试了代码的修改版本,该代码使用8个MailboxProcessors,每个代码都包含一个Map<int, User>用于按id查找用户的映射,它产生了一些改进,将UpdateName操作的总时间降低到1.2秒.但它仍然感觉很慢,修改后的代码在这里:

open System.Threading;
open System.Diagnostics;

type Inc() =

    let mutable n = 0;
    let sw = new Stopwatch()

    member x.Start() =
        sw.Start()

    member x.Increment() =
        if Interlocked.Increment(&n) >= 100000 then
            printf "UpdateName Time %A" sw.ElapsedMilliseconds

type Message
    = CreateUser of int * string
    | UpdateName of int * string

type User = {
    Id : int
    Name : string
}

[<EntryPoint>]
let main argv = 

    let sw = Stopwatch.StartNew()
    let incr = new Inc()
    let mb = 

        Seq.initInfinite(fun id -> 
            MailboxProcessor<Message>.Start(fun inbox -> 

                let rec loop users =
                    async {
                        let! m = inbox.Receive()

                        match m with
                        | CreateUser(id, name) ->
                            do! loop (Map.add id {Id=id; Name=name} users)

                        | UpdateName(id, newName) ->
                            match Map.tryFind id users with
                            | None -> 
                                do! loop users

                            | Some(user) ->
                                incr.Increment()
                                do! loop (Map.add id {user with Name = newName} users)
                    }

                loop Map.empty
            )
        ) 
        |> Seq.take 8
        |> Array.ofSeq

    printf "Create Time %i\n" sw.ElapsedMilliseconds

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(CreateUser(i, sprintf "User%i-UpdateName" i));

    incr.Start()

    for i in 0 .. 99999 do
        mb.[i % mb.Length].Post(UpdateName(i, sprintf "User%i-UpdateName" i));

    System.Console.ReadLine() |> ignore

    0
Run Code Online (Sandbox Code Playgroud)

所以我的问题在这里,我做错了什么?我是否误解了应该如何使用MailboxProcessor?或者这是预期的表现.

更新:

所以我在## fsharp @ irc.freenode.net上找到了一些人,这告诉我使用sprintf非常慢,而事实证明这是我的大部分性能问题都来自于.但是,删除上面的sprintf操作并且只为每个用户使用相同的名称,我仍然最终需要大约400ms才能进行操作,这感觉非常慢.

Jon*_*rop 16

现在,我意识到所有队列都有开销:在ThreadPool上,在MailboxProcessor内部设置/重置AutoResetEvents等.

而且printf,Map,Seq和争夺的全局可变Inc.而且你正在泄漏堆分配的堆栈帧.事实上,只有一小部分时间用于运行基准测试与此有关MailboxProcessor.

但这真的是预期的表现吗?

我对你的程序的性能并不感到惊讶,但它并没有说明性能MailboxProcessor.

通过阅读MSDN和MailboxProcessor上的各种博客,我已经认识到它将成为erlang演员的亲戚,但从我看到的深渊表现来看,这在现实中似乎并不成立?

MailboxProcessor概念上有点类似二郎的一部分.你所看到的糟糕表现是由于各种各样的事情,其中​​一些是相当微妙的,并将影响任何这样的程序.

所以我的问题在这里,我做错了什么?

我觉得你做错了几件事.首先,你试图解决的问题不明确,所以这听起来像一个XY问题.其次,您正在尝试对错误的事情进行基准测试(例如,您正在抱怨创建一个微秒所需的时间,MailboxProcessor但可能只在建立TCP连接时才会这样做,这需要花费几个数量级的时间).第三,你已经编写了一个基准程序来衡量一些事情的表现,但是把你的观察结果归结为完全不同的事情.

让我们更详细地看一下您的基准程序.在我们做任何其他事情之前,让我们修复一些错误.您应该始终使用它sw.Elapsed.TotalSeconds来测量时间,因为它更精确.您应始终使用return!和不使用异步工作流重复,do!否则您将泄漏堆栈帧.

我的初步时间是:

Creation stage: 0.858s
Post stage: 1.18s
Run Code Online (Sandbox Code Playgroud)

接下来,让我们运行一个配置文件,以确保我们的程序真正花费大部分时间来颠倒F#MailboxProcessor:

77%    Microsoft.FSharp.Core.PrintfImpl.gprintf(...)
 4.4%  Microsoft.FSharp.Control.MailboxProcessor`1.Post(!0)
Run Code Online (Sandbox Code Playgroud)

显然不是我们所希望的.更抽象地思考,我们使用类似的东西生成大量数据sprintf,然后应用它,但我们正在一起进行生成和应用.让我们分离出我们的初始化代码:

let ids = Array.init 100000 (fun id -> {Id = id; Name = sprintf "User%i" id})
...
    ids
    |> Array.map (fun id ->
        MailboxProcessor<Message>.Start(fun inbox -> 
...
            loop id
...
    printf "Create Time %fs\n" sw.Elapsed.TotalSeconds
    let fxs =
      [|for i in 0 .. 99999 ->
          mb.[i % mb.Length].Post, UpdateName(i, sprintf "User%i-UpdateName" i)|]
    incr.Start()
    for f, x in fxs do
      f x
...
Run Code Online (Sandbox Code Playgroud)

现在我们得到:

Creation stage: 0.538s
Post stage: 0.265s
Run Code Online (Sandbox Code Playgroud)

因此创建速度提高了60%,发布速度提高了4.5倍.

让我们尝试完全重写您的基准:

do
  for nAgents in [1; 10; 100; 1000; 10000; 100000] do
    let timer = System.Diagnostics.Stopwatch.StartNew()
    use barrier = new System.Threading.Barrier(2)
    let nMsgs = 1000000 / nAgents
    let nAgentsFinished = ref 0
    let makeAgent _ =
      new MailboxProcessor<_>(fun inbox ->
        let rec loop n =
          async { let! () = inbox.Receive()
                  let n = n+1
                  if n=nMsgs then
                    let n = System.Threading.Interlocked.Increment nAgentsFinished
                    if n = nAgents then
                      barrier.SignalAndWait()
                  else
                    return! loop n }
        loop 0)
    let agents = Array.init nAgents makeAgent
    for agent in agents do
      agent.Start()
    printfn "%fs to create %d agents" timer.Elapsed.TotalSeconds nAgents
    timer.Restart()
    for _ in 1..nMsgs do
      for agent in agents do
        agent.Post()
    barrier.SignalAndWait()
    printfn "%fs to post %d msgs" timer.Elapsed.TotalSeconds (nMsgs * nAgents)
    timer.Restart()
    for agent in agents do
      use agent = agent
      ()
    printfn "%fs to dispose of %d agents\n" timer.Elapsed.TotalSeconds nAgents
Run Code Online (Sandbox Code Playgroud)

此版本期望nMsgs每个代理在该代理之前增加共享计数器,从而大大降低该共享计数器的性能影响.该程序还检查了不同数量的代理的性能.在这台机器上我得到:

Agents  M msgs/s
     1    2.24
    10    6.67
   100    7.58
  1000    5.15
 10000    1.15
100000    0.36
Run Code Online (Sandbox Code Playgroud)

因此,您看到的msg/s速度较低的部分原因似乎是异常大量(100,000)的代理.使用10-1,000个代理程序,F#实现速度比使用100,000个代理程序快10倍以上.

因此,如果您可以使用这种性能,那么您应该能够在F#中编写整个应用程序,但如果您需要获得更多性能,我建议您使用不同的方法.你可能甚至不必牺牲使用F#(并且你当然可以用它来进行原型设计)采用像Disruptor这样的设计.在实践中,我发现在.NET上进行序列化所花费的时间往往远远大于在F#async中花费的时间MailboxProcessor.


Rob*_*sen 2

消除后sprintf,我得到了大约 12 秒(Mac 上的单声道没那么快)。采用 Phil Trelford 的建议,使用 Dictionary 而不是 Map,结果达到了 600 毫秒。还没有在 Win/.Net 上尝试过。

代码更改非常简单,并且本地可变性对我来说是完全可以接受的:

let mb = 
    Seq.initInfinite(fun id -> 
        MailboxProcessor<Message>.Start(fun inbox -> 
            let di = System.Collections.Generic.Dictionary<int,User>()
            let rec loop () =
                async {
                    let! m = inbox.Receive()

                    match m with
                    | CreateUser(id, name) ->
                        di.Add(id, {Id=id; Name=name})
                        return! loop ()

                    | UpdateName(id, newName) ->
                        match di.TryGetValue id with
                        | false, _ -> 
                            return! loop ()

                        | true, user ->
                            incr.Increment()
                            di.[id] <- {user with Name = newName}
                            return! loop ()
                }

            loop ()
        )
    ) 
    |> Seq.take 8
    |> Array.ofSeq
Run Code Online (Sandbox Code Playgroud)

  • 您必须使用“return!”而不是“do!”在异步工作流程中尾部重复,否则您将泄漏(堆分配的)堆栈帧! (4认同)