对 Publish().Refcount() 行为的困惑

Jus*_*inM 5 system.reactive

我这里有一个简单的程序,可以显示不同单词中的字母数。它按预期工作。

static void Main(string[] args) {
    var word = new Subject<string>();
    var wordPub = word.Publish().RefCount();
    var length = word.Select(i => i.Length);
    var report =
        wordPub
        .GroupJoin(length,
            s => wordPub,
            s => Observable.Empty<int>(),
            (w, a) => new { Word = w, Lengths = a })
        .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
    report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
    word.OnNext("Apple");
    word.OnNext("Banana");
    word.OnNext("Cat");
    word.OnNext("Donkey");
    word.OnNext("Elephant");
    word.OnNext("Zebra");
    Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)

输出是:

Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5
Run Code Online (Sandbox Code Playgroud)

我使用了 Publish().RefCount() 因为“wordpub”两次包含在“report”中。没有它,当首先发出一个单词时,报告的一部分将通过回调得到通知,然后报告的另一部分将得到通知,通知加倍。这就是发生的事情;输出最终有 11 个项目而不是 6 个。至少我认为是这样。我认为在这种情况下使用 Publish().RefCount() 作为同时更新报告的两个部分。

但是,如果我将长度函数更改为也使用已发布的源,如下所示:

var length = wordPub.Select(i => i.Length);
Run Code Online (Sandbox Code Playgroud)

然后输出是这样的:

Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5
Run Code Online (Sandbox Code Playgroud)

为什么长度函数不能也使用相同的发布源?

Lee*_*ell 3

这是一个需要解决的巨大挑战!发生这种情况的条件是如此微妙。提前为冗长的解释表示歉意,但请耐心等待!

长话短说

对已发布源的订阅按顺序处理,但先于直接对未发布源的任何其他订阅。即你可以插队!对于GroupJoin订阅顺序来说,确定窗口何时打开和关闭非常重要。


我首先担心的是您正在发布重新计数的主题。这应该是一个空操作。 Subject<T>没有订阅费用。

因此,当您删除Publish().RefCount()

var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
Run Code Online (Sandbox Code Playgroud)

然后你会遇到同样的问题。

然后我就看GroupJoin(因为我的直觉表明这Publish().Refcount()是一个转移注意力的事情)。对我来说,仅靠肉眼观察这一点太难以合理化,所以我也依靠简单的调试,多年来我已经使用了数十次 - aTraceLog扩展方法。

public interface ILogger
{
    void Log(string input);
}
public class DumpLogger : ILogger
{
    public void Log(string input)
    {
        //LinqPad `Dump()` extension method. 
        //  Could use Console.Write instead.
        input.Dump();
    }
}


public static class ObservableLoggingExtensions
{
    private static int _index = 0;

    public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
    {
        return Observable.Create<T>(o =>
        {
            var index = Interlocked.Increment(ref _index);
            var label = $"{index:0000}{name}";
            logger.Log($"{label}.Subscribe()");
            var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
            var subscription = source
                .Do(
                    x => logger.Log($"{label}.OnNext({x.ToString()})"),
                    ex => logger.Log($"{label}.OnError({ex})"),
                    () => logger.Log($"{label}.OnCompleted()")
                )
                .Subscribe(o);

            return new CompositeDisposable(subscription, disposed);
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

当我将日志记录添加到您提供的代码中时,它看起来像这样:

var logger = new DumpLogger();

var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
    wordPub.Log(logger, "lhs")
    .GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
        s => wordPub.Log(logger, "lhsDuration"),
        s => Observable.Empty<int>().Log(logger, "rhsDuration"),
        (w, a) => new { Word = w, Lengths = a })
    .SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Run Code Online (Sandbox Code Playgroud)

然后,这将在我的日志中输出如下内容

使用 Publish().RefCount() 进行记录

0001lhs.Subscribe()             
0002rhs.Subscribe()             
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()     
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()     
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()       
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6 
...
Run Code Online (Sandbox Code Playgroud)

但是,当我删除该用法时,Publish().RefCount()新的日志输出如下:

仅记录主题

0001lhs.Subscribe()                 
0002rhs.Subscribe()                 
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()         
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()         
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Apple 6 

    OnNext
    Banana 6 

0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
Run Code Online (Sandbox Code Playgroud)

这为我们提供了一些见解,但是当我们开始使用逻辑订阅列表来注释日志时,问题才真正变得清晰。

在带有 RefCount 的原始(工作)代码中,我们的注释可能如下所示

//word.Subsribers.Add(wordPub)

0001lhs.Subscribe()             //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe()             //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()     //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()       //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Banana 6 
Run Code Online (Sandbox Code Playgroud)

所以在这个例子中,当word.OnNext("Banana");执行时,观察者链按此顺序链接

  1. 词库
  2. 0002rhs

然而wordPub有子订阅!所以真正的订阅列表看起来像

  1. 词库
    1. 0001lhs
    2. 0003lhs持续时间
    3. 0005lhs持续时间
  2. 0002rhs

如果我们只注释主题日志,我们就会看到微妙之处

0001lhs.Subscribe()                 //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe()                 //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()         //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()

    OnNext
    Apple 5 

0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()         //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()

    OnNext
    Apple 6 

    OnNext
    Banana 6 

0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
Run Code Online (Sandbox Code Playgroud)

所以在这个例子中,当word.OnNext("Banana");执行时,观察者链按此顺序链接

1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
Run Code Online (Sandbox Code Playgroud)

由于0003lhsDuration订阅是在 后激活的0002rhs,因此它不会看到“Banana”值来终止窗口,直到 rhs发送该值之后,从而在仍然打开的窗口中生成它。

呼呼

正如 @francezu13k50 指出的,解决您的问题的明显而简单的解决方案就是使用word.Select(x => new { Word = x, Length = x.Length });,但我认为您已经为我们提供了实际问题的简化版本(赞赏),我理解为什么这不合适。但是,由于我不知道您真正的问题空间是什么,所以我不确定向您建议什么来提供解决方案,除了您当前的代码有一个解决方案,现在您应该知道为什么它会这样工作。