如何在C#中等待单个事件,具有超时和取消

lob*_*ism 18 c# events multithreading dispose asynchronous

所以我的要求是让我的函数等待event Action<T>来自另一个类和另一个线程的第一个实例,并在我的线程上处理它,允许等待被超时或中断CancellationToken.

我想创建一个我可以重用的泛型函数.我设法创建了一些(我认为)我需要的选项,但两者看起来都比我想象的要复杂得多.

用法

为了清楚起见,这个函数的示例使用看起来像这样,serialDevice在一个单独的线程上吐出事件:

var eventOccurred = Helper.WaitForSingleEvent<StatusPacket>(
    cancellationToken,
    statusPacket => OnStatusPacketReceived(statusPacket),
    a => serialDevice.StatusPacketReceived += a,
    a => serialDevice.StatusPacketReceived -= a,
    5000,
    () => serialDevice.RequestStatusPacket());
Run Code Online (Sandbox Code Playgroud)

选项1-ManualResetEventSlim

这个选项并不坏,但Dispose处理ManualResetEventSlim比看起来应该是更乱.它让ReSharper适合我在闭包内访问修改/处理的东西,而且真的很难遵循,所以我甚至不确定它是否正确.也许有一些我遗漏的东西可以清理它,这将是我的偏好,但我不会随便看到它.这是代码.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var eventOccurred = false;
    var eventResult = default(TEvent);
    var o = new object();
    var slim = new ManualResetEventSlim();
    Action<TEvent> setResult = result => 
    {
        lock (o) // ensures we get the first event only
        {
            if (!eventOccurred)
            {
                eventResult = result;
                eventOccurred = true;
                // ReSharper disable AccessToModifiedClosure
                // ReSharper disable AccessToDisposedClosure
                if (slim != null)
                {
                    slim.Set();
                }
                // ReSharper restore AccessToDisposedClosure
                // ReSharper restore AccessToModifiedClosure
            }
        }
    };
    subscribe(setResult);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        slim.Wait(msTimeout, token);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(setResult);
        lock(o) // ensure we don't access slim
        {
            slim.Dispose();
            slim = null;
        }
    }
    lock (o) // ensures our variables don't get changed in middle of things
    {
        if (eventOccurred)
        {
            handler(eventResult);
        }
        return eventOccurred;
    }
}
Run Code Online (Sandbox Code Playgroud)

选项2 - 没有a的轮询 WaitHandle

WaitForSingleEvent这里的功能更清洁.我能够使用ConcurrentQueue,因此甚至不需要锁.但我只是不喜欢轮询功能Sleep,我看不出这种方法有什么办法.我想传递一个WaitHandle而不是一个Func<bool>来清理Sleep,但第二个我这样做我已经把整个Dispose混乱再次清理.

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new ConcurrentQueue<TEvent>();
    subscribe(q.Enqueue);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        token.Sleep(msTimeout, () => !q.IsEmpty);
    }
    finally // ensures unsubscription in case of exception
    {
        unsubscribe(q.Enqueue);
    }
    TEvent eventResult;
    var eventOccurred = q.TryDequeue(out eventResult);
    if (eventOccurred)
    {
        handler(eventResult);
    }
    return eventOccurred;
}

public static void Sleep(this CancellationToken token, int ms, Func<bool> exitCondition)
{
    var start = DateTime.Now;
    while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition())
    {
        token.ThrowIfCancellationRequested();
        Thread.Sleep(1);
    }
}
Run Code Online (Sandbox Code Playgroud)

这个问题

我并不特别关心这些解决方案中的任何一种,也不是100%确定它们中的任何一种都是100%正确的.这些解决方案中的任何一个比其他解决方案更好(惯用性,效率等),还是有更简单的方法或内置函数来满足我在这里需要做的事情?

更新:目前为止的最佳答案

TaskCompletionSource以下解决方案的修改.没有长的闭合,锁或任何需要的东西.看起来非常简单.这里有错误吗?

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var tcs = new TaskCompletionSource<TEvent>();
    Action<TEvent> handler = result => tcs.TrySetResult(result);
    var task = tcs.Task;
    subscribe(handler);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        task.Wait(msTimeout, token);
    }
    finally
    {
        unsubscribe(handler);
        // Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx
    }
    if (task.Status == TaskStatus.RanToCompletion)
    {
        onEvent(task.Result);
        return true;
    }
    return false;
}
Run Code Online (Sandbox Code Playgroud)

更新2:另一个很棒的解决方案

事实证明,它的BlockingCollection工作方式与此类似,ConcurrentQueue但也有接受超时和取消令牌的方法.这个解决方案的一个好处是它可以更新,以便WaitForNEvents相当容易:

public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> handler, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null)
{
    var q = new BlockingCollection<TEvent>();
    Action<TEvent> add = item => q.TryAdd(item);
    subscribe(add);
    try
    {
        if (initializer != null)
        {
            initializer();
        }
        TEvent eventResult;
        if (q.TryTake(out eventResult, msTimeout, token))
        {
            handler(eventResult);
            return true;
        }   
        return false;
    }
    finally
    {
        unsubscribe(add);
        q.Dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)

Tho*_*que 5

您可以使用TaskCompletetionSource创建一个Task可以标记为已完成或已取消的。这是特定事件的可能实现:

public Task WaitFirstMyEvent(Foo target, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        target.MyEvent -= handler;
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        target.MyEvent -= handler;
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    target.MyEvent += handler;
    return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)

在C#5中,您可以像这样使用它:

private async Task MyMethod()
{
    ...
    await WaitFirstMyEvent(foo, cancellationToken);
    ...
}
Run Code Online (Sandbox Code Playgroud)

如果要同步等待事件,也可以使用以下Wait方法:

private void MyMethod()
{
    ...
    WaitFirstMyEvent(foo, cancellationToken).Wait();
    ...
}
Run Code Online (Sandbox Code Playgroud)

这是一个更通用的版本,但它仍然仅适用于带有Action签名的事件:

public Task WaitFirstEvent(
    Action<Action> subscribe,
    Action<Action> unsubscribe,
    CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();
    Action handler = null;
    var registration = cancellationToken.Register(() =>
    {
        unsubscribe(handler);
        tcs.TrySetCanceled();
    });
    handler = () =>
    {
        unsubscribe(handler);
        registration.Dispose();
        tcs.TrySetResult(null);
    };
    subscribe(handler);
    return tcs.Task;
}
Run Code Online (Sandbox Code Playgroud)

您可以像这样使用它:

await WaitFirstEvent(
        handler => foo.MyEvent += handler,
        handler => foo.MyEvent -= handler,
        cancellationToken);
Run Code Online (Sandbox Code Playgroud)

如果您希望它与其他事件签名(例如EventHandler)一起使用,则必须创建单独的重载。我认为没有一种简单的方法可以使它适用于任何签名,特别是因为参数的数量并不总是相同的。


Dax*_*ohl 2

您可以使用 Rx 将事件转换为可观察对象,然后转换为任务,最后使用令牌/超时等待该任务。

与任何现有解决方案相比,它的一个优点是它调用unsubscribe事件的线程,确保您的处理程序不会被调用两次。(在您的第一个解决方案中,您可以通过tcs.TrySetResult而不是解决此tcs.SetResult问题,但摆脱“TryDoSomething”并简单地确保 DoSomething 始终有效总是很好)。

另一个优点是代码简单。它本质上是一根线。所以你甚至不需要特别需要一个独立的函数。您可以内联它,以便更清楚您的代码到底做了什么,并且您可以对主题进行变体,而无需大量可选参数(例如您的可选参数,或initializer允许等待 N 个事件,或在以下情况下提前超时/取消)他们没有必要)。当它完成时,你会同时获得bool返回值实际result范围,如果这有用的话。

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
...
public static bool WaitForSingleEvent<TEvent>(this CancellationToken token, Action<TEvent> onEvent, Action<Action<TEvent>> subscribe, Action<Action<TEvent>> unsubscribe, int msTimeout, Action initializer = null) {
    var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask();
    if (initializer != null) {
        initializer();
    }
    try {
        var finished = task.Wait(msTimeout, token);
        if (finished) onEvent(task.Result);
        return finished;
    } catch (OperationCanceledException) { return false; }
}
Run Code Online (Sandbox Code Playgroud)