The*_*ias 2 c# system.reactive rx.net
最近,我偶然发现Enigmativity 关于和运算符的一个有趣的声明:PublishRefCount
您正在使用危险的 .Publish().RefCount() 运算符对,它会创建一个在完成后无法订阅的序列。
这一说法似乎与李·坎贝尔对这些经营者的评价相悖。引用他的书Intro to Rx:
Publish/RefCount 对对于获取冷可观察值并将其作为热可观察序列共享给后续观察者非常有用。
起初我不相信Enigmativity的说法是正确的,所以我试图反驳它。我的实验表明,这Publish().RefCount()确实可能不一致。第二次订阅已发布的序列可能会导致对源序列的新订阅,也可能不会,具体取决于连接时源序列是否已完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。以下是此行为的演示:
var observable = Observable
.Create<int>(o =>
{
o.OnNext(13);
o.OnCompleted(); // Commenting this line alters the observed behavior
return Disposable.Empty;
})
.Do(x => Console.WriteLine($"Producer generated: {x}"))
.Finally(() => Console.WriteLine($"Producer finished"))
.Publish()
.RefCount()
.Do(x => Console.WriteLine($"Consumer received #{x}"))
.Finally(() => Console.WriteLine($"Consumer finished"));
observable.Subscribe().Dispose();
observable.Subscribe().Dispose();
Run Code Online (Sandbox Code Playgroud)
在此示例中,observable由三部分组成。首先是生成部分,生成单个值然后完成。然后是发布机制(Publish+ RefCount)。最后是消费部分,观察生产者发出的值。已observable订阅两次。预期的行为是每个订阅都会收到一个值。但事实并非如此!这是输出:
var observable = Observable
.Create<int>(o =>
{
o.OnNext(13);
o.OnCompleted(); // Commenting this line alters the observed behavior
return Disposable.Empty;
})
.Do(x => Console.WriteLine($"Producer generated: {x}"))
.Finally(() => Console.WriteLine($"Producer finished"))
.Publish()
.RefCount()
.Do(x => Console.WriteLine($"Consumer received #{x}"))
.Finally(() => Console.WriteLine($"Consumer finished"));
observable.Subscribe().Dispose();
observable.Subscribe().Dispose();
Run Code Online (Sandbox Code Playgroud)
如果我们注释掉该行,这就是输出o.OnCompleted();。这种微妙的变化导致了预期和理想的行为:
Producer generated: 13
Consumer received #13
Producer finished
Consumer finished
Consumer finished
Run Code Online (Sandbox Code Playgroud)
在第一种情况下,冷生产者( 之前的部分Publish().RefCount())仅被订阅一次。第一个消费者收到了发出的值,但第二个消费者什么也没收到(通知除外OnCompleted)。在第二种情况下,制作人被订阅了两次。每产生一个值,每个消费者就得到一个值。
我的问题是:我们该如何解决这个问题?我们如何修改Publish运算符或RefCount或两者,以使它们的行为始终一致且符合要求?以下是理想行为的规范:
我要求PublishRefCount提供上述功能的自定义运算符,或者使用内置运算符实现所需功能的方法。
顺便说一句,存在类似的问题,询问为什么会发生这种情况。我的问题是如何解决它。
更新:回想起来,上述规范导致了不稳定的行为,使得竞争条件不可避免。无法保证对已发布序列的两次订阅将导致对源序列的单个订阅。源序列可能在两个订阅之间完成,导致第一个订阅者取消订阅,导致运营商取消订阅RefCount,导致下一个订阅者对源进行新的订阅。内置的行为.Publish().RefCount()可以防止这种情况发生。
道德教训是.Publish().RefCount()序列没有被破坏,但它不可重用。它不能可靠地用于多个连接/断开连接会话。如果您想要第二次会话,您应该创建一个新.Publish().RefCount()序列。
Lee解释得很好IConnectableObservable,但Publish解释得不太好。这是一种非常简单的动物,很难解释。我假设你明白IConnectableObservable:
如果我们简单而懒惰地重新实现零参数Publish函数,它将看起来像这样:
// For illustrative purposes only: don't use this code
public class PublishObservable<T> : IConnectableObservable<T>
{
private readonly IObservable<T> _source;
private readonly Subject<T> _proxy = new Subject<T>();
private IDisposable _connection;
public PublishObservable(IObservable<T> source)
{
_source = source;
}
public IDisposable Connect()
{
if(_connection == null)
_connection = _source.Subscribe(_proxy);
var disposable = Disposable.Create(() =>
{
_connection.Dispose();
_connection = null;
});
return _connection;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var _subscription = _proxy.Subscribe(observer);
return _subscription;
}
}
public static class X
{
public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
{
return new PublishObservable<T>(source);
}
}
Run Code Online (Sandbox Code Playgroud)
Publish创建一个Subject订阅源可观察对象的代理。代理可以根据连接订阅/取消订阅源: Call Connect,代理订阅源。调用Dispose一次性连接,代理从源取消订阅。从中得出的重要结论是,有一个代理Subject可以代理与源的任何连接。不保证您只有一份源订阅,但保证您有一个代理和一个并发连接。您可以通过连接/断开连接来进行多个订阅。
RefCount处理事物的调用Connect部分:这是一个简单的重新实现:
// For illustrative purposes only: don't use this code
public class RefCountObservable<T> : IObservable<T>
{
private readonly IConnectableObservable<T> _source;
private IDisposable _connection;
private int _refCount = 0;
public RefCountObservable(IConnectableObservable<T> source)
{
_source = source;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var subscription = _source.Subscribe(observer);
var disposable = Disposable.Create(() =>
{
subscription.Dispose();
DecrementCount();
});
if(++_refCount == 1)
_connection = _source.Connect();
return disposable;
}
private void DecrementCount()
{
if(--_refCount == 0)
_connection.Dispose();
}
}
public static class X
{
public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source)
{
return new RefCountObservable<T>(source);
}
}
Run Code Online (Sandbox Code Playgroud)
代码有点多,但仍然非常简单:如果引用计数上升到 1,则调用Connect,ConnectableObservable如果下降到 0,则断开连接。
将两者放在一起,您会得到一对,保证只有一个对源可观察对象的并发订阅,通过一个持久性代理Subject。Subject当下游订阅数量大于 0 时,只会订阅源。
看了上面的介绍,你的问题中有很多误解,所以我会一一回顾一下:
... Publish().RefCount() 确实可能不一致。第二次订阅已发布的序列可能会导致对源序列的新订阅,也可能不会,具体取决于连接时源序列是否已完成。如果已完成,则不会重新订阅。如果未完成,则将重新订阅。
.Publish().RefCount()仅在一种情况下重新订阅源:当订阅者从 0 变为 1 时。如果订阅者计数因任何原因从 0 变为 1 变为 0 变为 1 ,那么您最终将重新订阅。源可观察量完成将导致RefCount发出OnCompleted,并且其所有观察者取消订阅。因此后续订阅RefCount将触发重新订阅源的尝试。当然,如果源正确遵守可观察合约,它将OnCompleted立即发布,仅此而已。
[请参阅 OnCompleted 示例 observable...] observable 被订阅两次。预期的行为是每个订阅都会收到一个值。
不会。预期的行为是代理Subject在发出 an 后将OnCompleted重新发出 anOnCompleted到任何后续订阅尝试。由于您的源可观察量在第一个订阅结束时同步完成,因此第二个订阅将尝试订阅Subject已经发出OnCompleted. 这应该会导致OnCompleted,否则 Observable 合约就会被破坏。
[参见没有 OnCompleted 的示例 observable 作为第二种情况...] 在第一种情况下,冷生产者(Publish().RefCount() 之前的部分)仅订阅一次。第一个消费者收到了发出的值,但第二个消费者什么也没收到(除了 OnCompleted 通知)。在第二种情况下,制作人被订阅了两次。每产生一个值,每个消费者就得到一个值。
这是对的。由于代理Subject从未完成,后续对源的重新订阅将导致冷可观察的重新运行。
我的问题是:我们该如何解决这个问题?[..]
- 发布的序列应该向其订阅者传播直接来自源序列的所有通知,而不是其他任何通知。
- 当发布的序列的当前订阅者数量从零增加到一时,它应该订阅源序列。
- 只要发布的序列至少有一个订阅者,就应该与源保持连接。
- 当发布的序列的当前订阅者数量变为零时,应从源取消订阅。
.Publish只要您不完成/错误,上述所有情况当前都会.RefCount发生。我不建议实现一个改变这一点的操作符,从而破坏 Observable 合约。
编辑:
我认为 Rx 混乱的第一大来源是热/冷可观察量。由于Publish可以“预热”冷的可观测值,因此它会导致令人困惑的边缘情况也就不足为奇了。
首先,介绍一下可观察合约。Observable 合约更简洁的表述是, anOnNext永远不能跟在OnCompleted/后面OnError,并且应该只有一个OnCompleted 或 OnError通知。这确实留下了尝试订阅终止的可观察量的边缘情况:尝试订阅终止的可观察量会导致立即收到终止消息。这会破坏合同吗?也许吧,但据我所知,这是图书馆里唯一的合同作弊。另一种选择是订阅死气沉沉的空气。这对任何人都没有帮助。
这如何与热/冷可观测值联系起来?不幸的是,令人困惑。对冰冷的可观察量的订阅会触发整个可观察量管道的重建。这意味着订阅已终止的规则仅适用于热可观察量。冷可观测量总是重新开始。
考虑这段代码,其中o有一个冷可观察的:
var o = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(5);
var s1 = o.Subscribe(i => Console.WriteLine(i.ToString()));
await Task.Delay(TimeSpan.FromMilliseconds(600));
var s2 = o.Subscribe(i => Console.WriteLine(i.ToString()));
Run Code Online (Sandbox Code Playgroud)
就合约而言,可观察的背后s1和可观察的背后s2是完全不同的。因此,即使它们之间存在延迟,并且您最终会看到OnNext,OnCompleted但这不是问题,因为它们是完全不同的可观察量。
它的粘性在于热身Publish版本。如果您要添加到上面代码的.Publish().RefCount()末尾...o
s2将立即终止不打印任何内容。s2打印最后两个数字。s1为 only .Take(2),并且s2将重新开始打印 0 到 4。雪丁格猫效应让这种情况变得更糟:如果你设置一个观察者来o观察整个过程中会发生什么,就会改变引用计数,影响功能!观看它会改变行为。调试噩梦。
这就是尝试“加热”冷可观测量的危险。它只是效果不佳,尤其是与Publish/RefCount.
我的建议是:
Publish版本的一般规则Publish/RefCount可观察对象进行虚拟订阅。这至少提供了一致的Refcount >= 1,减少了量子活动效应。