使用带有被动扩展的Observable.Publish

bra*_*ing 1 .net system.reactive

我对使用Observable.Publish进行多播处理的生命周期感到有些困惑.如何使用正确连接?反对直觉我发现我不需要为多播观察者调用connect来启动他们的订阅.

var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);

// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()

// Does calling 
disposable.Dispose();
// unsubscribe field0 and field1?
Run Code Online (Sandbox Code Playgroud)

编辑

我的难题是为什么当我没有在IConnectableObservable显式上调用Connect时我成功订阅了.但是我在IConnectableObservable上调用Await,后者隐式调用Connect

Public Async Function MonitorMeasurements() As Task


    Dim cts = New CancellationTokenSource

    Try
        Using dialog = New TaskDialog(Of Unit)(cts)

            Dim measurementPoints = 
                MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
                TakeUntil(dialog.CancelObserved).Publish()

            Dim viewModel = New MeasurementViewModel(measurementPoints)
            dialog.Content = New MeasurementControl(viewModel)
            dialog.Show()

            Await measurementPoints
        End Using
    Catch ex As TimeoutException
        MessageBox.Show(ex.Message)
    Catch ex As Exception
        MessageBox.Show(ex.Message)
    End Try

End Function
Run Code Online (Sandbox Code Playgroud)

注意我的TaskDialog在按下取消按钮时公开了一个名为CancelObserved的observable.

该解决方案由@asti发布在链接中.以下是该链接中RX团队的引用

注意使用await会导致订阅发生,从而使可观察序列变热.此版本中包括对IConnectableObservable的支持,这会导致将序列连接到其源以及订阅它.如果没有Connect调用,await操作将永远不会完成

Lee*_*ell 5

发布允许您共享订阅。这显然对于使冷可观察序列变得热最有用。即,采用导致某些订阅副作用(可能是与网络的连接)发生的序列,并确保副作用执行一次,并且序列的结果在消费者之间共享。

在实践中,您可以在冷序列上调用发布,订阅消费者,然后在订阅后连接已发布的序列,以缓解任何竞争条件。

基本上,您在上面所做的事情。

对于已经很热门的序列(例如主题、FromEventPattern 或已经发布和连接的内容)来说,这在很大程度上毫无意义。

处理 Connect() 方法中的值将“断开”序列,从而防止消费者获取更多值。如果其中任何消费者订阅想要提前分离,您还可以处置这些订阅。

说了这么多,您似乎正在做正确的事情。您看到的问题是什么?我假设您正在连接到一个已经很热门的序列。


Ast*_*sti 5

Publish在一个源上返回一个IConnectableObservable<T>基本上IObservable<T>是一个Connect方法.您可以使用ConnectIDisposable返回控制订阅源.

Rx旨在成为一个火灾和遗忘系统.在您明确处置订阅或完成/错误之前,订阅不会终止.

即,disp0 = field0.Subscribe(...); disp1 = field1.Subscribe(...) - 在disp0, disp1明确处置之前不会终止订阅- 这与组播源的连接无关.

您可以连接和断开连接,而不会打扰下面的管道.不用担心手动管理连接的更简单方法是使用.Publish().RefCount()哪个将维持连接,只要至少有一个观察者仍然订阅它.这被称为预热可观察性.


更新了问题的编辑

OP打电话awaitIConnectableObservable<T>.

Rx的发行说明:

..使用await通过导致订阅发生使可观察序列变热.此版本中包括对IConnectableObservable的支持,这会导致将序列连接到其源以及订阅它.如果没有Connect调用,await操作将永远不会完成.

示例(取自同一页面)

static async  void Foo()
{
    var xs = Observable.Defer(() =>
    {
        Console.WriteLine("Operation started!");
        return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
    });

    var ys = xs.Publish();

    // This doesn't trigger a connection with the source yet.
    ys.Subscribe(x => Console.WriteLine("Value = " + x));

    // During the asynchronous sleep, nothing will be printed.
    await Task.Delay(5000);

    // Awaiting causes the connection to be made. Values will be printed now,
    // and the code below will return 9 after 10 seconds.
    var y =  await ys;
    Console.WriteLine("Await result = " + y);
}
Run Code Online (Sandbox Code Playgroud)