结合时间紧密的事件,提高效率和公平性

dtb*_*dtb 26 c# concurrency

我有一堆线程生成类型A和类型的事件B.

我的程序接收这些事件,将它们包装在一条消息中并通过网络发送.消息可以包含一个A事件,一个B事件或一个A事件和一个B事件:

SendMessage(new Message(a: 1,    b: null));
SendMessage(new Message(a: null, b: 2   ));
SendMessage(new Message(a: 3,    b: 4   ));
Run Code Online (Sandbox Code Playgroud)

类型A事件经常发生,而类型事件发生的频率则B低得多.因此,当一个线程生成一个B事件时,我的程序会稍等一下,看看另一个线程是否生成一个A事件,A并在B可能的情况下组合事件和事件.

这是我的代码:

object gate = new object();
int? pendingB;

Message WrapA(int a, int millisecondsTimeout)
{
    int? b;

    lock (gate)
    {
        b = pendingB;
        pendingB = null;
        Monitor.Pulse(gate);
    }

    return new Message(a, b);
}

Message WrapB(int b, int millisecondsTimeout)
{
    lock (gate)
    {
        if (pendingB == null)
        {
            pendingB = b;
            Monitor.Wait(gate, millisecondsTimeout);
            if (pendingB != b) return null;
            pendingB = null;
        }
    }

    return new Message(null, b);
}
Run Code Online (Sandbox Code Playgroud)

这项工作到目前为止.但是,有两个问题:

  • 如果有大量A事件和大量B事件,则算法效率不高:即使有足够的事件,B事件也只会附加一定比例的事件.AA

  • 如果暂时没有A生成事件(不常见,但并非不可能),则该算法完全不公平:一个生成B事件的线程必须每次等待,而所有其他线程可以立即发送它们的B事件.

如何提高算法的效率和公平性?

约束:
•  WrapA并且WrapB必须在短暂的确定时间内终止.
•  SendMessage必须在任何锁之外调用.
•除了以外没有可用的同步机制gate.
•没有其他线程,任务,计时器等可用.
•由于类型的事件A在正常情况下经常发生,所以忙碌等待WrapB很好.


这是一个可以用作基准的测试程序:

public static class Program
{
    static int counter0 = 0;
    static int counterA = 0;
    static int counterB = 0;
    static int counterAB = 0;

    static void SendMessage(Message m)
    {
        if (m != null)
            if (m.a != null)
                if (m.b != null)
                    Interlocked.Increment(ref counterAB);
                else
                    Interlocked.Increment(ref counterA);
            else
                if (m.b != null)
                    Interlocked.Increment(ref counterB);
                else
                    Interlocked.Increment(ref counter0);
    }

    static Thread[] Start(int threadCount, int eventCount,
        int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
    {
        Thread[] threads = new Thread[threadCount * eventCount];
        for (int i = 0; i < threadCount; i++)
        {
            for (int j = 0; j < eventCount; j++)
            {
                int k = i * 1000 + j;
                int l = j * eventInterval + i;
                threads[i * eventCount + j] = new Thread(() =>
                {
                    Thread.Sleep(l);
                    SendMessage(wrap(k, wrapTimeout));
                });
                threads[i * eventCount + j].Start();
            }
        }
        return threads;
    }

    static void Join(params Thread[] threads)
    {
        for (int i = 0; i < threads.Length; i++)
        {
            threads[i].Join();
        }
    }

    public static void Main(string[] args)
    {
        var wrapper = new MessageWrapper();
        var sw = Stopwatch.StartNew();

        // Only A events
        var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
        Join(t0);

        // A and B events
        var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
        var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
        Join(t1);
        Join(t2);

        // Only B events
        var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
        Join(t3);

        Console.WriteLine(sw.Elapsed);

        Console.WriteLine("0:  {0}", counter0);
        Console.WriteLine("A:  {0}", counterA);
        Console.WriteLine("B:  {0}", counterB);
        Console.WriteLine("AB: {0}", counterAB);

        Console.WriteLine("Generated A: {0}, Sent A: {1}",
            10 * 40 + 10 * 40, counterA + counterAB);
        Console.WriteLine("Generated B: {0}, Sent B: {1}",
            10 * 10 + 10 * 20, counterB + counterAB);
    }
}
Run Code Online (Sandbox Code Playgroud)

afr*_*hke 7

为了它的乐趣,这是一个无锁实现:

public sealed class MessageWrapper
{
    private int pendingB;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int b = Interlocked.Exchange(ref pendingB, -1);
        return new Message(a, b == -1 ? null : b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        var sw = new SpinWait();
        while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
        {
            // Spin
            sw.SpinOnce();

            if (sw.NextSpinWillYield)
            {
                // Let us make progress instead of yielding the processor
                // (avoid context switch)
                return new Message(null, b);
            }
        }

        return null;
    }
}
Run Code Online (Sandbox Code Playgroud)

结果

原始实施:

00:00:02.0433298
0:  0
A:  733
B:  233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)

无锁实施:

00:00:01.2546310
0:  0
A:  717
B:  217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)

更新

不幸的是,上面的实现有一个bug加上一些缺点.这是一个改进版本:

public class MessageWrapper
{
    private int pendingB = EMPTY;
    private const int EMPTY = -1;

    public Message WrapA(int a, int millisecondsTimeout)
    {
        int? b;
        int count = 0;
        while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
        {
            if (count % 7 == 0)
            {
                Thread.Sleep(0);
            }
            else if (count % 23 == 0)
            {
                Thread.Sleep(1);
            }
            else
            {
                Thread.Yield();
            }
            if (++count == 480)
            {
                return new Message(a, null);
            }
        }
        return new Message(a, b);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        int count = 0;
        while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
        {
            // Spin
            Thread.SpinWait((4 << count++));
            if (count > 10)
            {
                // We didn't manage to place our payload.
                // Let's send it ourselves:
                return new Message(null, b);
            }
        }

        // We placed our payload. 
        // Wait some more to see if some WrapA snatches it.
        while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
        {
            Thread.SpinWait((4 << count++));
            if (count > 20)
            {
                // No WrapA came along. Pity, we will have to send it ourselves
                int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
                return payload == b ? new Message(null, b) : null;
            }
        }
        return null;
    }
}
Run Code Online (Sandbox Code Playgroud)

结果:

OP的实施

00:00:02.1389474
0:  0
A:  722
B:  222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)

第二次实施:

00:00:01.2752425
0:  0
A:  700
B:  200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)


Chr*_*fer 5

为了多样性,我尝试了一种基于并发集合的方法.对我来说,从发布的限制中不清楚这是否可以,但我还是会拍我的答案:

这是我机器上原始代码的典型输出:

00:00:01.7835426
0:  0
A:  723
B:  223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)

这是我的建议的典型输出,比原始代码慢约20%,但它捕获更多'AB'消息:

00:00:02.1322512
0:  0
A:  701
B:  201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)

MessageWrapper实现:

public class MessageWrapper
{
    private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
    private BlockingCollection<int?> messageB = new BlockingCollection<int?>();

    public Message WrapA(int a, int millisecondsTimeout)
    {
        messageA.Add(a);
        return CreateMessage(0);
    }

    public Message WrapB(int b, int millisecondsTimeout)
    {
        messageB.Add(b);
        return CreateMessage(millisecondsTimeout);
    }

    private Message CreateMessage(int timeout)
    {
        int? a, b;

        if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
        {
            return new Message(a, b);
        }
        else
        {
            return null;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)