我有一堆线程生成类型A和类型的事件B.
我的程序接收这些事件,将它们包装在一条消息中并通过网络发送.消息可以包含一个A事件,一个B事件或一个A事件和一个B事件:
SendMessage(new Message(a: 1, b: null));
SendMessage(new Message(a: null, b: 2 ));
SendMessage(new Message(a: 3, b: 4 ));
Run Code Online (Sandbox Code Playgroud)
类型A事件经常发生,而类型事件发生的频率则B低得多.因此,当一个线程生成一个B事件时,我的程序会稍等一下,看看另一个线程是否生成一个A事件,A并在B可能的情况下组合事件和事件.
这是我的代码:
object gate = new object();
int? pendingB;
Message WrapA(int a, int millisecondsTimeout)
{
int? b;
lock (gate)
{
b = pendingB;
pendingB = null;
Monitor.Pulse(gate);
}
return new Message(a, b);
}
Message WrapB(int b, int millisecondsTimeout)
{
lock (gate)
{
if (pendingB == null)
{
pendingB = b;
Monitor.Wait(gate, millisecondsTimeout);
if (pendingB != b) return null;
pendingB = null;
}
}
return new Message(null, b);
}
Run Code Online (Sandbox Code Playgroud)
这项工作到目前为止.但是,有两个问题:
如果有大量A事件和大量B事件,则算法效率不高:即使有足够的事件,B事件也只会附加一定比例的事件.AA
如果暂时没有A生成事件(不常见,但并非不可能),则该算法完全不公平:一个生成B事件的线程必须每次等待,而所有其他线程可以立即发送它们的B事件.
如何提高算法的效率和公平性?
约束:
• WrapA并且WrapB必须在短暂的确定时间内终止.
• SendMessage必须在任何锁之外调用.
•除了以外没有可用的同步机制gate.
•没有其他线程,任务,计时器等可用.
•由于类型的事件A在正常情况下经常发生,所以忙碌等待WrapB很好.
这是一个可以用作基准的测试程序:
public static class Program
{
static int counter0 = 0;
static int counterA = 0;
static int counterB = 0;
static int counterAB = 0;
static void SendMessage(Message m)
{
if (m != null)
if (m.a != null)
if (m.b != null)
Interlocked.Increment(ref counterAB);
else
Interlocked.Increment(ref counterA);
else
if (m.b != null)
Interlocked.Increment(ref counterB);
else
Interlocked.Increment(ref counter0);
}
static Thread[] Start(int threadCount, int eventCount,
int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
{
Thread[] threads = new Thread[threadCount * eventCount];
for (int i = 0; i < threadCount; i++)
{
for (int j = 0; j < eventCount; j++)
{
int k = i * 1000 + j;
int l = j * eventInterval + i;
threads[i * eventCount + j] = new Thread(() =>
{
Thread.Sleep(l);
SendMessage(wrap(k, wrapTimeout));
});
threads[i * eventCount + j].Start();
}
}
return threads;
}
static void Join(params Thread[] threads)
{
for (int i = 0; i < threads.Length; i++)
{
threads[i].Join();
}
}
public static void Main(string[] args)
{
var wrapper = new MessageWrapper();
var sw = Stopwatch.StartNew();
// Only A events
var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
Join(t0);
// A and B events
var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
Join(t1);
Join(t2);
// Only B events
var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
Join(t3);
Console.WriteLine(sw.Elapsed);
Console.WriteLine("0: {0}", counter0);
Console.WriteLine("A: {0}", counterA);
Console.WriteLine("B: {0}", counterB);
Console.WriteLine("AB: {0}", counterAB);
Console.WriteLine("Generated A: {0}, Sent A: {1}",
10 * 40 + 10 * 40, counterA + counterAB);
Console.WriteLine("Generated B: {0}, Sent B: {1}",
10 * 10 + 10 * 20, counterB + counterAB);
}
}
Run Code Online (Sandbox Code Playgroud)
为了它的乐趣,这是一个无锁实现:
public sealed class MessageWrapper
{
private int pendingB;
public Message WrapA(int a, int millisecondsTimeout)
{
int b = Interlocked.Exchange(ref pendingB, -1);
return new Message(a, b == -1 ? null : b);
}
public Message WrapB(int b, int millisecondsTimeout)
{
var sw = new SpinWait();
while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
{
// Spin
sw.SpinOnce();
if (sw.NextSpinWillYield)
{
// Let us make progress instead of yielding the processor
// (avoid context switch)
return new Message(null, b);
}
}
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
结果
原始实施:
00:00:02.0433298
0: 0
A: 733
B: 233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)
无锁实施:
00:00:01.2546310
0: 0
A: 717
B: 217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)
更新
不幸的是,上面的实现有一个bug加上一些缺点.这是一个改进版本:
public class MessageWrapper
{
private int pendingB = EMPTY;
private const int EMPTY = -1;
public Message WrapA(int a, int millisecondsTimeout)
{
int? b;
int count = 0;
while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
{
if (count % 7 == 0)
{
Thread.Sleep(0);
}
else if (count % 23 == 0)
{
Thread.Sleep(1);
}
else
{
Thread.Yield();
}
if (++count == 480)
{
return new Message(a, null);
}
}
return new Message(a, b);
}
public Message WrapB(int b, int millisecondsTimeout)
{
int count = 0;
while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
{
// Spin
Thread.SpinWait((4 << count++));
if (count > 10)
{
// We didn't manage to place our payload.
// Let's send it ourselves:
return new Message(null, b);
}
}
// We placed our payload.
// Wait some more to see if some WrapA snatches it.
while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
{
Thread.SpinWait((4 << count++));
if (count > 20)
{
// No WrapA came along. Pity, we will have to send it ourselves
int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
return payload == b ? new Message(null, b) : null;
}
}
return null;
}
}
Run Code Online (Sandbox Code Playgroud)
结果:
OP的实施
00:00:02.1389474
0: 0
A: 722
B: 222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)
第二次实施:
00:00:01.2752425
0: 0
A: 700
B: 200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)
为了多样性,我尝试了一种基于并发集合的方法.对我来说,从发布的限制中不清楚这是否可以,但我还是会拍我的答案:
这是我机器上原始代码的典型输出:
00:00:01.7835426
0: 0
A: 723
B: 223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)
这是我的建议的典型输出,比原始代码慢约20%,但它捕获更多'AB'消息:
00:00:02.1322512
0: 0
A: 701
B: 201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
Run Code Online (Sandbox Code Playgroud)
MessageWrapper实现:
public class MessageWrapper
{
private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
private BlockingCollection<int?> messageB = new BlockingCollection<int?>();
public Message WrapA(int a, int millisecondsTimeout)
{
messageA.Add(a);
return CreateMessage(0);
}
public Message WrapB(int b, int millisecondsTimeout)
{
messageB.Add(b);
return CreateMessage(millisecondsTimeout);
}
private Message CreateMessage(int timeout)
{
int? a, b;
if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
{
return new Message(a, b);
}
else
{
return null;
}
}
}
Run Code Online (Sandbox Code Playgroud)