Chr*_*oph 2 c# reactive-programming system.reactive observable
既然我在Observable这里感冒了,我多次订阅"分组",为什么我不需要在这里发布?当我运行它时,我会期望它会带来不必要的结果,但令我惊讶的是它可以使用和不使用Publish.这是为什么?
var subject = new List<string>
{
"test",
"test",
"hallo",
"test",
"hallo"
}.ToObservable();
subject
.GroupBy(x => x)
.SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
.Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
.Subscribe(result => Console.WriteLine("You typed {0} {1} times",
result.Chars, result.Count));
// I Would have expect that I need to use Publish like that
//subject
// .GroupBy(x => x)
// .SelectMany(grouped => grouped.Publish(sharedGroup =>
// sharedGroup.Scan(0, (count, _) => ++count)
// .Zip(sharedGroup, (count, chars) =>
// new { Chars = chars, Count = count })))
// .Subscribe(result => Console.WriteLine("You typed {0} {1} times",
// result.Chars, result.Count));
Console.ReadLine();
Run Code Online (Sandbox Code Playgroud)
编辑
保罗注意到,因为我们订阅了两次潜在的冷观察,我们应该重复两次.但是,我没有运气使这个效果可见.我试图插入调试行,但例如这只打印"执行"一次.
var subject = new List<Func<string>>
{
() =>
{
Console.WriteLine("performing");
return "test";
},
() => "test",
() => "hallo",
() => "test",
() => "hallo"
}.ToObservable();
subject
.Select(x => x())
.GroupBy(x => x)
.SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
.Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
.Subscribe(result => Console.WriteLine("You typed {0} {1} times",
result.Chars, result.Count));
Run Code Online (Sandbox Code Playgroud)
我想知道我们是否可以看到我们处理冷可观察并且没有使用的效果Publish().在另一个步骤中,我想看看如何Publish()(见上文)使效果消失.
编辑2
正如Paul所说,我IObservable<string>为调试目的创建了一个自定义.但是,如果你在它的Subscribe()方法中设置一个断点,你会注意到它只会被击中一次.
class Program
{
static void Main(string[] args)
{
var subject = new MyObservable();
subject
.GroupBy(x => x)
.SelectMany(grouped => grouped.Scan(0, (count, _) => ++count)
.Zip(grouped, (count, chars) => new { Chars = chars, Count = count }))
.Subscribe(result => Console.WriteLine("You typed {0} {1} times",
result.Chars, result.Count));
Console.ReadLine();
}
}
class MyObservable : IObservable<string>
{
public IDisposable Subscribe(IObserver<string> observer)
{
observer.OnNext("test");
observer.OnNext("test");
observer.OnNext("hallo");
observer.OnNext("test");
observer.OnNext("hallo");
return Disposable.Empty;
}
}
Run Code Online (Sandbox Code Playgroud)
所以对我来说问题仍然存在.为什么我Publish这里不需要感冒Observable?
小智 6
您只使用基于列表的源一次,因此您不会在那里看到重复的订阅效果.回答您问题的关键是以下观察:
从GroupBy流出的IGroupedObservable <K,T>对象本身就是伪装的主题.
在内部,GroupBy保持Dictionary <K,ISubject <T >>.每当有消息进入时,它就会被相应的密钥发送到主题中.您正在为分组对象订阅两次,这是安全的,因为主题将生产者与消费者分离.