Ale*_*hov 7 c# asynchronous system.reactive
我正在尝试为项目创建异步单元测试,但无法理解如何等待异步主题完成:
[Test]
public async void MicroTest()
{
var value = 2;
var first = new AsyncSubject<int>();
var second = new AsyncSubject<int>();
first.Subscribe(_ =>
{
value = _;
second.OnCompleted();
});
first.OnNext(1);
// how to wait for the second subject to complete?
Assert.AreEqual(value, 1);
}
Run Code Online (Sandbox Code Playgroud)
此测试的同步版本运行良好:
[Test]
public void MicroTest()
{
var value = 2;
var first = new Subject<int>();
var second = new Subject<int>();
first.Subscribe(_ =>
{
value = _;
second.OnCompleted();
});
first.OnNext(1);
Assert.AreEqual(value, 1);
}
Run Code Online (Sandbox Code Playgroud)
Jam*_*rld 12
首先,值得指出的AsyncSubject<T>
不是异步版本Subject<T>
.两者实际上都是自由线程*(见脚注).
AsyncSubject
是一种特殊化,Subject
旨在用于建模异步完成并返回单个结果的操作.它有两个值得注意的特点:
它在内部用于各种地方,包括ToObservable()
在Task
和上定义的扩展方法Task<T>
.
召回AsyncSubject<T>
只会返回收到的最终结果.它通过等待OnCompleted()来实现这一点,因此它知道最终结果是什么.因为你不叫OnCompleted()
上first
你的测试是有缺陷的OnNext()
处理-将永远不会被调用-在你的订阅调用传递lambda函数.
此外,如果不打电话OnNext()
至少一次是无效的AsyncSubject<T>
,所以当你打电话时await second;
,InvalidOperationException
如果你没有这样做,你会得到一个.
如果你按如下方式编写测试,一切都很好:
[Test]
public async void MicroTest()
{
var value = 2;
var first = new AsyncSubject<int>();
var second = new AsyncSubject<int>();
first.Subscribe(_ =>
{
// won't be called until an OnCompleted() has
// been invoked on first
value = _;
// you must send *some* value to second
second.OnNext(_);
second.OnCompleted();
});
first.OnNext(1);
// you must do this for OnNext handler to be called
first.OnCompleted();
// how to wait for the second subject to complete
await second;
Assert.AreEqual(value, 1);
}
Run Code Online (Sandbox Code Playgroud)
作为一般规则,我会避免编写可以永远等待的异步测试.当它导致构建服务器上的资源耗尽时,这尤其令人讨厌.使用某种超时,例如:
await second.Timeout(TimeSpan.FromSeconds(1));
Run Code Online (Sandbox Code Playgroud)
无需处理异常,因为这足以使测试失败.
**我从COM词典中借用了这个术语.从这个意义上讲,我的意思是,与大多数Rx框架组件一样,它们通常会在您调用其方法的任何线程上运行.自由线程并不一定意味着完全线程安全.特别是,不同于AsyncSubject<T>
,Subject<T>
不保护您免受Rx语法违反进行重叠调用OnNext
.使用Subject.Synchronize
或Observable.Synchronize
保护.*