bra*_*ing 1 .net system.reactive
我对使用Observable.Publish进行多播处理的生命周期感到有些困惑.如何使用正确连接?反对直觉我发现我不需要为多播观察者调用connect来启动他们的订阅.
var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);
// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()
// Does calling
disposable.Dispose();
// unsubscribe field0 and field1?
Run Code Online (Sandbox Code Playgroud)
编辑
我的难题是为什么当我没有在IConnectableObservable显式上调用Connect时我成功订阅了.但是我在IConnectableObservable上调用Await,后者隐式调用Connect
Public Async Function MonitorMeasurements() As Task
Dim cts = New CancellationTokenSource
Try
Using dialog = New TaskDialog(Of Unit)(cts)
Dim measurementPoints =
MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
TakeUntil(dialog.CancelObserved).Publish()
Dim viewModel = New MeasurementViewModel(measurementPoints)
dialog.Content = New MeasurementControl(viewModel)
dialog.Show()
Await measurementPoints
End Using
Catch ex As TimeoutException
MessageBox.Show(ex.Message)
Catch ex As Exception
MessageBox.Show(ex.Message)
End Try
End Function
Run Code Online (Sandbox Code Playgroud)
注意我的TaskDialog在按下取消按钮时公开了一个名为CancelObserved的observable.
解
该解决方案由@asti发布在链接中.以下是该链接中RX团队的引用
注意使用await会导致订阅发生,从而使可观察序列变热.此版本中包括对IConnectableObservable的支持,这会导致将序列连接到其源以及订阅它.如果没有Connect调用,await操作将永远不会完成
发布允许您共享订阅。这显然对于使冷可观察序列变得热最有用。即,采用导致某些订阅副作用(可能是与网络的连接)发生的序列,并确保副作用执行一次,并且序列的结果在消费者之间共享。
在实践中,您可以在冷序列上调用发布,订阅消费者,然后在订阅后连接已发布的序列,以缓解任何竞争条件。
基本上,您在上面所做的事情。
对于已经很热门的序列(例如主题、FromEventPattern 或已经发布和连接的内容)来说,这在很大程度上毫无意义。
处理 Connect() 方法中的值将“断开”序列,从而防止消费者获取更多值。如果其中任何消费者订阅想要提前分离,您还可以处置这些订阅。
说了这么多,您似乎正在做正确的事情。您看到的问题是什么?我假设您正在连接到一个已经很热门的序列。
Publish在一个源上返回一个IConnectableObservable<T>基本上IObservable<T>是一个Connect方法.您可以使用Connect和IDisposable返回控制订阅源.
Rx旨在成为一个火灾和遗忘系统.在您明确处置订阅或完成/错误之前,订阅不会终止.
即,disp0 = field0.Subscribe(...); disp1 = field1.Subscribe(...) - 在disp0, disp1明确处置之前不会终止订阅- 这与组播源的连接无关.
您可以连接和断开连接,而不会打扰下面的管道.不用担心手动管理连接的更简单方法是使用.Publish().RefCount()哪个将维持连接,只要至少有一个观察者仍然订阅它.这被称为预热可观察性.
更新了问题的编辑
OP打电话await的IConnectableObservable<T>.
..使用await通过导致订阅发生使可观察序列变热.此版本中包括对IConnectableObservable的支持,这会导致将序列连接到其源以及订阅它.如果没有Connect调用,await操作将永远不会完成.
示例(取自同一页面)
static async void Foo()
{
var xs = Observable.Defer(() =>
{
Console.WriteLine("Operation started!");
return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
});
var ys = xs.Publish();
// This doesn't trigger a connection with the source yet.
ys.Subscribe(x => Console.WriteLine("Value = " + x));
// During the asynchronous sleep, nothing will be printed.
await Task.Delay(5000);
// Awaiting causes the connection to be made. Values will be printed now,
// and the code below will return 9 after 10 seconds.
var y = await ys;
Console.WriteLine("Await result = " + y);
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4545 次 |
| 最近记录: |