跨流程事件 - 可靠地释放所有服务员

Alo*_*aus 8 .net c# windows multithreading

我通过ManualResetEvent创建了一个跨进程事件.当此事件确实发生时,应该取消阻止n个不同进程中的n个线程并开始运行以获取新数据.问题是,似乎ManualResetEvent.Set后跟立即重置不会导致所有等待线程被唤醒.那里的文档很模糊

http://msdn.microsoft.com/en-us/library/windows/desktop/ms682396(v=vs.85).aspx

当手动重置事件对象的状态发出信号时,它将保持信号状态,直到它被ResetEvent函数显式重置为无信号.可以在发出对象状态的信号时释放任意数量的等待线程或随后开始对指定事件对象执行等待操作的线程.

有一种名为PulseEvent的方法似乎正是我所需要的,但遗憾的是它也存在缺陷.

等待同步对象的线程可以通过内核模式APC暂时从等待状态中移除,然后在APC完成后返回到等待状态.如果在线程从等待状态中删除期间发生对PulseEvent的调用,则不会释放该线程,因为PulseEvent仅释放那些在被调用时正在等待的线程.因此,PulseEvent不可靠,不应被新应用程序使用.相反,使用条件变量.

现在MS建议使用条件变量.

条件变量是同步原语,它使线程能够等到特定条件发生.条件变量是不能跨进程共享的用户模式对象.

按照文档,我似乎运气不好,可靠地做到了.是否有一种简单的方法可以在没有一个ManualResetEvent的规定限制的情况下完成同样的事情,或者我是否需要为每个侦听器进程创建一个响应事件来为每个订阅的调用者获取ACK?在这种情况下,我需要一个小的共享内存来注册订阅进程的pids,但这似乎带来了自己的一组问题.当一个进程崩溃或没有响应时会发生什么?....

给出一些背景信息.我有新的状态要发布所有其他进程应该从共享内存位置读取.当一次发生多个更新但是进程必须至少读取最后的最新值时,可以错过一次更新.我可以用超时轮询,但这似乎不是一个正确的解决方案.

目前我很感兴趣

ChangeEvent = new EventWaitHandle(false, EventResetMode.ManualReset, counterName + "_Event");

ChangeEvent.Set();
Thread.Sleep(1); // increase odds to release all waiters
ChangeEvent.Reset();
Run Code Online (Sandbox Code Playgroud)

rlb*_*rlb 5

处理生产者必须唤醒所有消费者和消费者数量的情况的一个通用选择是使用移动围栏方法.此选项也需要共享内存IPC区域.该方法有时会导致消费者在没有工作时被唤醒,特别是如果许多流程需要调度和负载很高,但除了无望的超载机器外,它们总会被唤醒.

创建几个手动重置事件,让生产者维护一个计数器,以便设置下一个事件.除NextToFire事件外,所有事件都保留设置.消费者进程等待NextToFire事件.当制作人希望唤醒所有消费者时,它会重置Next + 1事件并设置当前事件.最终将安排所有消费者,然后等待新的NextToFire活动.结果是只有生产者使用ResetEvent,但消费者总是知道下一个唤醒它们的事件.

所有用户初始化:(伪代码是C/C++,而不是C#)

// Create Shared Memory and initialise NextToFire;
pSharedMemory = MapMySharedMemory();
if (First to create memory) pSharedMemory->NextToFire = 0;

HANDLE Array[4];
Array[0] = CreateEvent(NULL, 1, 0, "Event1");
Array[1] = CreateEvent(NULL, 1, 0, "Event2");
Array[2] = CreateEvent(NULL, 1, 0, "Event3");
Array[3] = CreateEvent(NULL, 1, 0, "Event4");
Run Code Online (Sandbox Code Playgroud)

制片人唤醒所有人

long CurrentNdx = pSharedMemory->NextToFire;
long NextNdx = (CurrentNdx+1) & 3;

// Reset next event so consumers block
ResetEvent(Array[NextNdx]);

// Flag to consumers new value
long Actual = InterlockedIncrement(&pSharedMemory->NextToFire) & 3;

// Next line needed if multiple producers active.
// Not a perfect solution
if (Actual != NextNdx) ResetEvent(Actual);

// Now wake them all up
SetEvent(CurrentNdx);
Run Code Online (Sandbox Code Playgroud)

消费者等待逻辑

long CurrentNdx = (pSharedMemory->NextToFire) & 3;
WaitForSingleObject(Array[CurrentNdx],  Timeout);
Run Code Online (Sandbox Code Playgroud)