在方法上创建一个observable

Gau*_*dar 5 c# system.reactive .net-core

在我的应用程序中,我有一个方法,每次在服务器上的某个队列上发生更新时都会调用该方法.应用程序初始化为以这种方式运行.

现在,每次使用最新数据调用该方法时,我都希望将其视为事件流的一部分,从而使其成为永远不会以订阅者结束的Observable的一部分.

我面临的挑战是:如何在被调用的方法上创建一个observable?以下是我的示例代码.

//This method is invoked every time an update happens on the server
public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    Observable.Create<MyObject3>(observer =>
        {
            var object3 = new MyObject3(object1, object2);
            observer.OnNext(object3 );

            return Disposable.Empty;
        })
        .Subscribe(x => WriteLine($"Message acknowledged"));
}
Run Code Online (Sandbox Code Playgroud)

但是这会在每次调用方法时创建一个可观察对象而不是我想要的东西,而且它看起来都不是正确的方法.我还读到使用"主题"或"AsyncSubject"不是解决问题的正确方法.

Eni*_*ity 3

不使用的规则Subject更多的是一个尚未得到很好表达的指南。

一般来说,如果您在可观察管道使用主题,那么您可能会做错一些事情 - 这是应该避免的。

如果您使用 aSubject作为可观察量的源,并且正确封装了 aSubject 对其进行了混淆,那么就可以了。因此,这通常意味着使用一个private只有您的代码可以访问的字段(因此没有人可以调用.OnCompleted()它并调用.AsObservable(),以便没有人可以将您的可观察值转换回底层Subject.

在您的情况下,您直接订阅,因此.AsObservable()不需要,但我怀疑这只是演示代码。在现实世界中,请确保你混淆了。

您的代码应如下所示:

private Subject<MyObject3> _subject = new Subject<MyObject3>();

private void SetUpObservable()
{
    _subject = new Subject<MyObject3>();
    _subject.Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _subject.OnNext(new MyObject3(object1, object2));
}
Run Code Online (Sandbox Code Playgroud)

现在,如果您仍然想避免,Subject那么您可以这样做作为替代方案:

private Action<MyObject3> _delegate;

private void SetUpObservable()
{
    Observable
        .FromEvent<MyObject3>(h => _delegate += h, h => _delegate -= h)
        .Subscribe(x => Console.WriteLine($"Message acknowledged"));
}

public virtual void MessageHandler(MyObject1 object1, MyObject2 object2)
{
    _delegate?.Invoke(new MyObject3(object1, object2));
}
Run Code Online (Sandbox Code Playgroud)

在我看来,这Subject可以让您更好地控制并且更容易设置。

无论如何,您可能应该保留订阅IDisposable,以便可以正确清理。