Cri*_*scu 6 c# system.reactive
我有一个我想用Rx处理的用户交互场景.
该场景类似于规范"当用户停止输入,做一些工作"(通常,搜索用户到目前为止输入的内容)(1) - 但我还需要:
对于(1)我使用a IObservable作为用户事件,限制.Throttle()为仅触发事件之间的暂停("用户停止键入").
从这一点,我.Select(_ => CreateMyTask(...).ToObservable()).
这给了我一个IObservable<IObservable<T>>内部observable包含单个任务的位置.
为了得到(2)我最终申请.Switch()只得到最新工作单元的结果.
那么(3) - 取消待定任务?
如果我理解正确,只要有新的内部IObservable<T>,该.Switch()方法就会订阅它并取消订阅前一个内容,从而导致它们Dispose().
也许这可能以某种方式连线触发任务取消?
Bra*_*don 12
您可以使用Observable.FromAsync哪个将生成在观察者取消订阅时取消的标记:
input.Throttle(...)
.Select(_ => Observable.FromAsync(token => CreateMyTask(..., token)))
.Switch()
.Subscribe(...);
Run Code Online (Sandbox Code Playgroud)
这将为每个工作单元生成一个新令牌,并在每次Switch切换到新工作单时取消它.
您必须使用任务吗?
如果您乐于纯粹使用 Observables 工作,那么您自己也可以做得很好。
尝试做这样的事情:
var query =
Observable.Create<int>(o =>
{
var cancelling = false;
var cancel = Disposable.Create(() =>
{
cancelling = true;
});
var subscription = Observable.Start(() =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling)
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
}).Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
Run Code Online (Sandbox Code Playgroud)
这个可观察量在 for 循环中使用 进行一些艰苦的工作Thread.Sleep(10);,但是当可观察量被处置时,循环将退出,密集的 CPU 工作也会停止。然后您可以使用标准 RxDispose来Switch取消正在进行的工作。
如果您希望将其捆绑在一个方法中,请尝试以下操作:
public static IObservable<T> Start<T>(Func<Func<bool>, T> work)
{
return Observable.Create<T>(o =>
{
var cancelling = false;
var cancel = Disposable
.Create(() => cancelling = true);
var subscription = Observable
.Start(() => work(() => cancelling))
.Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
}
Run Code Online (Sandbox Code Playgroud)
然后用这样的函数调用它:
Func<Func<bool>, int> work = cancelling =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling())
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
};
Run Code Online (Sandbox Code Playgroud)
这是我的代码,证明这是有效的:
var disposable =
ObservableEx
.Start(work)
.Subscribe(x => Console.WriteLine(x));
Thread.Sleep(500);
disposable.Dispose();
Run Code Online (Sandbox Code Playgroud)
我的输出是“Cancelled on 50”(有时是“Cancelled on 51”)。