如何与发布和连接共享一个observable?

Why*_*rrh 6 c# system.reactive observable

我有一个可观察的数据流,我正在应用操作,分成两个独立的流,对两个流中的每一个流应用更多(不同的)操作,并再次合并在一起.我正在尝试使用Publish和分享两个订阅者之间的可观察性,Connect但每个订阅者似乎都在使用单独的流.也就是说,在下面的示例中,我看到两个订阅者为流中的每个项目打印一次"执行昂贵的操作" .(想象一下,昂贵的操作是在所有订阅者之间只发生一次的事情,因此我试图重用流.)我已经使用PublishConnect尝试与两个订阅者共享合并的observable,但它似乎有错误影响.

问题示例:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var connectable = Observable.Merge(a, b).Publish();
connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));
connectable.Connect();
Run Code Online (Sandbox Code Playgroud)

我看到以下输出:

Doing expensive operation
Doing expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing expensive operation
Doing expensive operation
Subscriber B got: { Source = B, Value = #1 }
Run Code Online (Sandbox Code Playgroud)

(输出继续,为简洁而截断.)

如何与两个订阅者共享observable?

Eni*_*ity 15

您发布了错误的可观察对象.

使用当前代码,您正在合并,然后像这样发布Observable.Merge(a, b).Publish();.现在,因为a&b是针对定义expensive你仍然得到两个订阅expensive.

订阅会创建这些管道:

原版的

如果你.Publish();从代码中取出,你可以看到这一点.输出变为:

Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Run Code Online (Sandbox Code Playgroud)

这会创建这些管道:

没有发布

因此,通过将.Publish()备份转移到expensive您消除问题.这就是你真正需要它的地方,因为它毕竟是昂贵的操作.

这是您需要的代码:

var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false });
var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler);
var expensive = timer.Select(i =>
{
    // Converting to strings is an expensive operation
    Console.WriteLine("Doing an expensive operation");
    return string.Format("#{0}", i);
});

var connectable = expensive.Publish();

var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s });
var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s });

var merged = Observable.Merge(a, b);

merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s));
merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s));

connectable.Connect();
Run Code Online (Sandbox Code Playgroud)

这很好地产生了以下内容:

Doing an expensive operation
Subscriber A got: { Source = A, Value = #0 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #1 }
Doing an expensive operation
Subscriber A got: { Source = A, Value = #2 }
Doing an expensive operation
Subscriber B got: { Source = B, Value = #3 }
Run Code Online (Sandbox Code Playgroud)

这给你这些管道:

昂贵的发布

您可以从此图像中看到仍然存在重复.这很好,因为这些部件并不昂贵.

复制实际上很重要.管道的共享部分使其端点易受错误影响,从而提前终止.共享越少,代码的健壮性就越好.只有当您进行昂贵的操作时才应该担心发布.否则你应该让管道成为自己.

这是一个展示它的例子.如果您没有已发布的源,那么,如果一个源产生错误,那么它不会下拉所有管道.

分离

但是一旦你引入了一个共享的observable,那么一个错误就会降低所有的管道.

共享