mar*_*ark 2 c# asynchronous system.reactive observable
请注意以下单元测试:
using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
namespace UnitTests
{
[TestClass]
public class TestRx
{
public const int UNIT_TEST_TIMEOUT = 5000;
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
});
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void Subscribe()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
}, e => Assert.Fail(), () =>
{
Assert.AreEqual(100, i);
tcs.TrySetResult(null);
});
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeCancel()
{
var tcs = new TaskCompletionSource<object>();
var cts = new CancellationTokenSource();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
cts.Cancel();
}
}, e =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, () =>
{
Assert.IsTrue(i < 100);
tcs.TrySetResult(null);
}, cts.Token);
tcs.Task.Wait();
}
[TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
public void SubscribeThrow()
{
var tcs = new TaskCompletionSource<object>();
int i = 0;
GetObservable().Subscribe(n =>
{
Assert.AreEqual(i, n);
++i;
if (i == 5)
{
throw new Exception("xo-xo");
}
}, e =>
{
Assert.AreEqual("xo-xo", e.Message);
tcs.TrySetResult(null);
}, Assert.Fail);
tcs.Task.Wait();
}
}
}
Run Code Online (Sandbox Code Playgroud)
单元测试SubscribeCancel和SubscribeThrow超时,因为OnError回调永远不会被调用,因此对任务的等待永远不会结束。
怎么了?
聚苯乙烯
这个问题与如何正确地用 IObservable 包装 SqlDataReader相关?
编辑
与此同时,我创建了一个新的 Rx 问题 - https://rx.codeplex.com/workitem/74
编辑2
以下观察者实现产生完全相同的结果,即使它符合Rx 设计指南的第 6.5 段- “订阅实现不应抛出”:
private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
return Observable.Create<int>(async (obs, cancellationToken) =>
{
try
{
for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
{
int value = i;
obs.OnNext(await Task.Factory.StartNew(() =>
{
Thread.Sleep(msWait);
return value;
}));
}
obs.OnCompleted();
}
catch (Exception exc)
{
obs.OnError(exc);
}
});
}
Run Code Online (Sandbox Code Playgroud)
编辑3
我开始相信,当异步可观察序列集成到其他同步代码中时,应该编写这样的代码(通常在服务器端的某个地方就是这种情况):
var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
try
{
...
}
catch (Exception e)
{
DoErrorLogic();
tcs.TrySetException(e);
}
}, e =>
{
DoErrorLogic();
tcs.TrySetException(e);
}, () =>
{
DoCompletedLogic();
tcs.TrySetResult(null);
});
tcs.Task.Wait();
Run Code Online (Sandbox Code Playgroud)
真的是这样吗?
编辑 4
我想它终于开始在我生锈的大脑中流淌,你想说的话。我现在将切换到我的另一篇文章 -如何正确使用 IObservable 包装 SqlDataReader?
此行为是设计使然。如果订阅者抛出异常(顺便说一句,这是不好的做法),Rx 框架会正确地推断它已死并且不再与其通信。如果订阅被取消,这也不是错误——只是请求不再发送任何类型的事件——这是 Rx 尊重的。
我认为在文档中没有一个简单的参考可以指出 - 您所看到的行为是如此内在,它是隐含的。我能得到的最接近的是指向AnonymousSafeObserver和 AutoDetatchObserver的源代码。后者有一个可能有帮助的解释性场景,但它有点复杂。
也许一个类比会有所帮助。想象一下,数据流事件是由报刊代理传送的报纸。订阅者是家庭。
订阅者抛出异常
报刊亭高兴地送报纸,直到有一天,其中一位订阅者——一位琼斯先生——打开煤气,他的房子爆炸了,炸死了琼斯先生并摧毁了房子(抛出未处理的异常)。报刊亭意识到他不能再向琼斯先生递送报纸,也不能发送终止通知,而且报纸供应没有问题(因此 OnError 或 OnCompleted 不合适),报刊亭继续减少一个订阅者。
相比之下,报纸印刷商无意中使用了易燃墨水,并将工厂置于火海之中。现在,可怜的报刊经销商确实必须向所有订阅者发送一个解释性说明 (OnError),说明供应已无限期停止。
订阅者取消订阅
琼斯先生正在接收他订阅的报纸,直到有一天他觉得自己厌倦了无休止的令人沮丧的故事洪流,并要求取消订阅。报刊亭有义务。他没有给琼斯先生发一张便条,说明报纸已经停止印刷版本(没有 OnCompleted)——他们没有。他也没有给琼斯先生发一张纸条,说明报纸停业了(没有 OnError)——他只是按照琼斯先生的要求停止送报。
我同情你的挣扎。我注意到在您的代码中,您一直试图将 TPL(任务)习语与 Rx 的习语结合起来。这样的尝试常常让人觉得很笨拙,因为它们真的是完全不同的世界。很难对这样的段落发表评论:
我开始相信,当异步可观察序列集成到其他同步代码中时,应该编写这样的代码(通常在服务器端的某个地方就是这种情况):
非常同意 Brandon 的精妙断言,我想不出在哪些实例中以您尝试的方式将异步代码与服务器端的同步代码集成在一起真的很合适。这对我来说就像一种设计气味。习惯上,人们会尝试保持代码反应性 - 进行订阅,并让订阅者反应性地处理工作。我不记得遇到过按照您描述的方式转换为同步代码的必要性。
当然,查看您在 Edit3 中编写的代码,不清楚您要实现的目标。对订阅者中的错误做出反应不是源的责任。这是摇狗的尾巴。需要在那里确保订阅者服务连续性的异常处理程序应该在订阅处理代码中,而不是在源 observable 中——它应该只关心自身免受流氓观察者行为的影响。这种逻辑在上面链接的 AnonymousSafeObserver 中实现,并且被大多数 Rx 提供的操作符使用。observable 很可能具有处理其源数据连续性的逻辑- 但这是一个不同的问题,而不是您在代码中解决的问题。
无论您在何处尝试通过调用ToTask或桥接同步代码Wait,都可能需要仔细考虑您的设计。
我觉得提供更具体的问题陈述 - 可能来自您试图解决的真实世界场景 - 将有助于为您提供更有用的建议。你说的“SqlDataReader”示例......
最后,人们可能会通过订阅直接使用 observable [包装 SqlDataReader],但他们必须在某个时刻等待结束(阻塞线程),因为周围的大部分代码仍然是同步的。
... 突出了您所处的设计泥潭。在这种情况下,正如您推断的那样,这些消费者显然最好使用IEnumerable<T>界面 - 或者可能要求IObservable<List<T>>. 但关键是要放眼大局,您试图将 SqlDataReader 包装在一个可观察的包装器中这一事实完全是一种设计味道——因为这是一种响应特定一次性请求的固定数据供应。这可能是一个异步场景 - 但不是真正的反应式场景。与更典型的反应性场景形成对比,例如“每当您获得股票 X 的价格时,将它们发送给我”,在这种情况下,您完全按照源的要求设置未来的数据流,以便订阅者做出反应。
| 归档时间: |
|
| 查看次数: |
1341 次 |
| 最近记录: |