dot*_*tnz 3 c# multithreading system.reactive
我有一个应用程序与多个观察者一起运行可观察的间隔。每隔 0.5 秒,该时间间隔就会从 Web 服务器加载一些 XML 数据,然后观察者在后台线程上执行一些特定于应用程序的处理。一旦不再需要数据,订阅和可观察间隔就会被释放,因此观察者的 OnNext/OnCompleted/OnError 将不再被调用。到目前为止,一切都很好。
我的问题:在某些罕见的情况下,调用 Dispose 后我的观察者的 OnNext 方法可能仍在运行!在处理后进行进一步操作之前,我想确保OnNext已经完成。
我当前的解决方案:我在观察者类中引入了一个储物柜字段(请参阅代码)。处置后,我尝试获取锁,并仅在获取锁后才继续。虽然这个解决方案有效(?),但在某种程度上我感觉不对。
问题:有没有更优雅、更“Rx Way”来解决这个问题?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace RxExperimental
{
internal sealed class MyXmlDataFromWeb
{
public string SomeXmlDataFromWeb { get; set; }
}
internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
{
private readonly object _locker = new object();
private readonly string _observerName;
public MyObserver(string observerName) {
this._observerName = observerName;
}
public object Locker {
get { return this._locker; }
}
public void OnCompleted() {
lock (this._locker) {
Console.WriteLine("{0}: Completed.", this._observerName);
}
}
public void OnError(Exception error) {
lock (this._locker) {
Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
}
}
public void OnNext(MyXmlDataFromWeb value) {
lock (this._locker) {
Console.WriteLine(" {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(" {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
Thread.Sleep(5000); // simulate some long running operation
Console.WriteLine(" {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
}
}
}
internal sealed class Program
{
private static void Main() {
const int interval = 500;
//
var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
var data = new MyXmlDataFromWeb {
SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
};
return data;
}).Publish();
//
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
//
var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
//
var connection = dataSource.Connect();
//
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
//
subscription1.Dispose();
subscription2.Dispose();
connection.Dispose();
//
lock (observer1.Locker) {
Console.WriteLine("Observer 1 completed.");
}
lock (observer2.Locker) {
Console.WriteLine("Observer 2 completed.");
}
//
Console.WriteLine("Can only be executed, after all observers completed.");
}
}
}
Run Code Online (Sandbox Code Playgroud)
是的,还有一种更 Rx 的方式可以做到这一点。
第一个观察结果是,取消订阅可观察流本质上与观察者中当前发生的事情无关。确实没有任何反馈。由于您需要明确知道观察何时结束,因此您需要将其建模到可观察流中。换句话说,您应该完成流,以便能够观察OnComplete事件,而不是取消订阅流。在您的情况下,您可以使用TakeUntil结束可观察对象而不是取消订阅。
第二个观察是你的主程序需要观察你的“观察者”何时完成其工作。但既然你让你的“观察者”成为一个实际的IObservable,你就没有办法做到这一点。当人们第一次开始使用 Rx 时,我发现这是一个常见的混乱来源。如果您将“观察者”建模为可观察链中的另一个链接,那么您的主程序就可以观察。具体来说,您的“观察者”只不过是一个映射操作(有副作用),它将传入的 Xml 数据映射到“完成”消息中。
所以如果你重构你的代码,你就能得到你想要的......
public class MyObserver
{
private readonly string _name;
public MyObserver(string name) { _name = name; }
public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source)
{
return source.Select(value =>
{
Thread.Sleep(5000); // simulate work
return Unit.Default;
});
}
}
// main
var endSignal = new Subject<Unit>();
var dataSource = Observable
.Interval(...)
.Select(...)
.TakeUntil(endSignal)
.Publish();
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
var results1 = observer1.Handle(dataSource.ObserveOn(...));
var results2 = observer2.Handle(dataSource.ObserveOn(...));
// since you just want to know when they are all done
// just merge them.
// use ToTask() to subscribe them and collect the results
// as a Task
var processingDone = results1.Merge(results2).Count().ToTask();
dataSource.Connect();
Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();
// end the stream
endSignal.OnNext(Unit.Default);
// wait for the processing to complete.
// use await, or Task.Result
var numProcessed = await processingDone;
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3628 次 |
| 最近记录: |