Ron*_*erg 5 c# system.reactive cancellation
我有一个非常简单的IObservable<int>,每500毫秒充当一个脉冲发生器:
var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
i => TimeSpan.FromMilliseconds(500))
Run Code Online (Sandbox Code Playgroud)
我有一个CancellationTokenSource(用于取消同时进行的其他工作).
如何使用取消令牌源取消我的可观察序列?
这是一个旧线程,但仅供将来参考,这是一种更简单的方法.
如果您有CancellationToken,则可能已经在处理任务.因此,只需将其转换为Task并让框架执行绑定:
using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);
Run Code Online (Sandbox Code Playgroud)
这将创建一个内部订户,将在取消任务时进行处理.在大多数情况下,这将起到作用,因为大多数可观察者只有在有订阅者时才会产生值.
现在,如果你有一个实际的observable需要由于某种原因被处置(如果父任务被取消,可能是一个不再重要的热观察),这可以通过延续来实现:
disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
if (t.Status == TaskStatus.Canceled)
disposableObservable.Dispose();
});
Run Code Online (Sandbox Code Playgroud)
如果您正在使用GenerateWithTime(现在替换为Generate传入时间跨度函数重载),您可以替换第二个参数来评估取消令牌的状态,如下所示:
var pulses = Observable.Generate(0,
i => !ts.IsCancellationRequested,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(500));
Run Code Online (Sandbox Code Playgroud)
或者,如果您设置取消令牌的事件可以转换为可观察的本身,您可以使用以下内容:
pulses.TakeUntil(CancelRequested);
Run Code Online (Sandbox Code Playgroud)
我在http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable上发布了更详细的解释.
这里有两个方便的运算符,用于取消可观察的序列。它们之间的区别在于取消时会发生什么。导致TakeUntil序列正常完成 ( OnCompleted),而WithCancellation导致异常终止 ( OnError)。
/// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(observer =>
cancellationToken.Register(() => observer.OnNext(default))));
}
/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
this IObservable<TSource> source, CancellationToken cancellationToken)
{
return source
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
Run Code Online (Sandbox Code Playgroud)
使用示例:
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var pulses = Observable
.Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
.WithCancellation(cts.Token);
Run Code Online (Sandbox Code Playgroud)
注意:如果取消,上面提供的自定义运算符会立即从基础可观察对象中取消订阅。如果可观察到的结果包含副作用,则需要考虑这一点。将 放在TakeUntil(cts.Token)执行副作用的运算符之前将推迟整个可观察量的完成,直到副作用完成(优雅终止)。将其放在副作用之后将使取消立即发生,从而可能导致任何正在运行的代码以“即发即忘”的方式继续运行而不被观察到。
| 归档时间: |
|
| 查看次数: |
8070 次 |
| 最近记录: |