为什么IObservable <T> .First()阻止?

ala*_*irs 10 c# reactive-programming system.reactive

我一直在努力试探最近的.NET Reactive Extensions,但是已经触及了一些概念墙:我无法弄清楚为什么IObservable.First()会阻塞.

我有一些示例代码看起来像这样:

var a = new ListItem(a);
var b = new ListItem(b);
var c = new ListItem(c);
var d = new ListItem(d);

var observableList = new List<ListItem> { a,b,c,d }.ToObservable();

var itemA = observableList.First();

// Never reached
Assert.AreEqual(expectedFoo, itemA.Foo);
Run Code Online (Sandbox Code Playgroud)

我期待发生的事情是itemA在引用上等于a并且能够访问其成员等.相反,发生的是First()块和Assert.AreEqual()从未到达的块.

现在,我知道在使用Rx时,代码应该Subscribe()IObservables,所以很可能我在这里做错了.但是,根据各种方法签名,不可能执行以下任一操作:

observableList.First().Subscribe(item => Assert.AreEqual(expectedFoo, item));
Run Code Online (Sandbox Code Playgroud)

要么

observableList.Subscribe(SomeMethod).First() // This doesn't even make sense, right?
Run Code Online (Sandbox Code Playgroud)

我错过了什么?

ala*_*irs 9

在测试项目中尝试使用此代码工作正常,因此我重新查看了有问题的代码.事实证明问题是因为它IObservable<ListItem>Publish()封装在某个地方,因此被转换成了一个IConnectableObservable<ListItem>.没有连接调用,订阅从未"激活".

  • 你的问题有点模棱两可.Scott Weinstein的答案在技术上是正确的.First()是一个阻塞运算符.在您的情况下,您希望您的程序立即继续,因为您的Observable应该在订阅时立即产生值,但First()仍应被视为阻塞运算符.Take(1).Subscribe(...)会以非阻塞形式给你相同的行为.但是,您对ConnectableObservable的观察对其他人非常有帮助,所以感谢发布此内容. (3认同)

Sco*_*ein 5

First()返回一个T,而不是一个Observable<T>,所以它必须阻止.

非阻塞形式是 xs.Take(1)