如何在观察CancellationToken的同时等待一个等待对象?

tea*_*sus 3 c# asynchronous task-parallel-library system.reactive async-await

我有一个等待的对象不是Task(例如,IObservable<T>安装了RX).我想创建一个Task,如果提供CancellationToken被取消将最终取消,否则返回等待对象的结果.我想出了以下代码:

public static Task ObserveTokenAsync(CancellationToken token)
{
    TaskCompletionSource<Unit> tcs = new TaskCompletionSource<Unit>();
    token.Register(() => tcs.SetCanceled());
    return tcs.Task;
}

public static async Task<T> WrapObservableAsync<T>(IObservable<T> obs)
{
    return await obs;
}

public static async Task<T> AwaitWhileObservingTokenAsync<T>(IObservable<T> obs, CancellationToken token)
{
    var obsTask = WrapObservableAsync(obs);
    var tokenTask = ObserveTokenAsync(token);
    await Task.WhenAny(obsTask, tokenTask).ConfigureAwait(false);
    token.ThrowIfCancellationRequested();
    return obsTask.Result;
}
Run Code Online (Sandbox Code Playgroud)

使用这种方法,我将需要为我将使用的每个等待类型重载/重写最后两种方法.有没有更好的方法来做到这一点?我的猜测是INotifyCompletion界面可能在某种程度上有用.

此外,ObserveTokenAsync如果提供的令牌不会被取消,该方法是否会导致某种资源/内存泄漏?如果是,将在TaskCompletionSource方法结束时设置为完成AwaitWhileObservingTokenAsync将是一个很好的方法来解决它?

Dan*_*ber 6

使用TaskObservableExtensions.ToTaskRx中的扩展方法.它需要a CancellationToken并将从可观察的最后一个元素获取完成返回的任务.当CancellationToken取消时,返回的任务将OperationCanceledException在等待时抛出.如果observable不包含任何元素,则返回的任务将在等待时抛出异常(可能InvalidOperationException虽然我不得不查看它).