我这里有一个简单的程序,可以显示不同单词中的字母数。它按预期工作。
static void Main(string[] args) {
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub
.GroupJoin(length,
s => wordPub,
s => Observable.Empty<int>(),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => Console.WriteLine($"{i.Word} {i.Length}"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Console.ReadLine();
}
Run Code Online (Sandbox Code Playgroud)
输出是:
Apple 5
Banana 6
Cat 3
Donkey 6
Elephant 8
Zebra 5
Run Code Online (Sandbox Code Playgroud)
我使用了 Publish().RefCount() 因为“wordpub”两次包含在“report”中。没有它,当首先发出一个单词时,报告的一部分将通过回调得到通知,然后报告的另一部分将得到通知,通知加倍。这就是发生的事情;输出最终有 11 个项目而不是 6 个。至少我认为是这样。我认为在这种情况下使用 Publish().RefCount() 作为同时更新报告的两个部分。
但是,如果我将长度函数更改为也使用已发布的源,如下所示:
var length = wordPub.Select(i => i.Length);
Run Code Online (Sandbox Code Playgroud)
然后输出是这样的:
Apple 5
Apple 6
Banana 6
Cat 3
Banana 3
Cat 6
Donkey 6
Elephant 8
Donkey 8
Elephant 5
Zebra 5
Run Code Online (Sandbox Code Playgroud)
为什么长度函数不能也使用相同的发布源?
这是一个需要解决的巨大挑战!发生这种情况的条件是如此微妙。提前为冗长的解释表示歉意,但请耐心等待!
长话短说
对已发布源的订阅按顺序处理,但先于直接对未发布源的任何其他订阅。即你可以插队!对于GroupJoin订阅顺序来说,确定窗口何时打开和关闭非常重要。
我首先担心的是您正在发布重新计数的主题。这应该是一个空操作。
Subject<T>没有订阅费用。
因此,当您删除Publish().RefCount():
var word = new Subject<string>();
var wordPub = word;//.Publish().RefCount();
var length = word.Select(i => i.Length);
Run Code Online (Sandbox Code Playgroud)
然后你会遇到同样的问题。
然后我就看GroupJoin(因为我的直觉表明这Publish().Refcount()是一个转移注意力的事情)。对我来说,仅靠肉眼观察这一点太难以合理化,所以我也依靠简单的调试,多年来我已经使用了数十次 - aTrace或Log扩展方法。
public interface ILogger
{
void Log(string input);
}
public class DumpLogger : ILogger
{
public void Log(string input)
{
//LinqPad `Dump()` extension method.
// Could use Console.Write instead.
input.Dump();
}
}
public static class ObservableLoggingExtensions
{
private static int _index = 0;
public static IObservable<T> Log<T>(this IObservable<T> source, ILogger logger, string name)
{
return Observable.Create<T>(o =>
{
var index = Interlocked.Increment(ref _index);
var label = $"{index:0000}{name}";
logger.Log($"{label}.Subscribe()");
var disposed = Disposable.Create(() => logger.Log($"{label}.Dispose()"));
var subscription = source
.Do(
x => logger.Log($"{label}.OnNext({x.ToString()})"),
ex => logger.Log($"{label}.OnError({ex})"),
() => logger.Log($"{label}.OnCompleted()")
)
.Subscribe(o);
return new CompositeDisposable(subscription, disposed);
});
}
}
Run Code Online (Sandbox Code Playgroud)
当我将日志记录添加到您提供的代码中时,它看起来像这样:
var logger = new DumpLogger();
var word = new Subject<string>();
var wordPub = word.Publish().RefCount();
var length = word.Select(i => i.Length);
var report =
wordPub.Log(logger, "lhs")
.GroupJoin(word.Select(i => i.Length).Log(logger, "rhs"),
s => wordPub.Log(logger, "lhsDuration"),
s => Observable.Empty<int>().Log(logger, "rhsDuration"),
(w, a) => new { Word = w, Lengths = a })
.SelectMany(i => i.Lengths.Select(j => new { Word = i.Word, Length = j }));
report.Subscribe(i => ($"{i.Word} {i.Length}").Dump("OnNext"));
word.OnNext("Apple");
word.OnNext("Banana");
word.OnNext("Cat");
word.OnNext("Donkey");
word.OnNext("Elephant");
word.OnNext("Zebra");
Run Code Online (Sandbox Code Playgroud)
然后,这将在我的日志中输出如下内容
使用 Publish().RefCount() 进行记录
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
...
Run Code Online (Sandbox Code Playgroud)
但是,当我删除该用法时,Publish().RefCount()新的日志输出如下:
仅记录主题
0001lhs.Subscribe()
0002rhs.Subscribe()
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe()
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe()
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
...
Run Code Online (Sandbox Code Playgroud)
这为我们提供了一些见解,但是当我们开始使用逻辑订阅列表来注释日志时,问题才真正变得清晰。
在带有 RefCount 的原始(工作)代码中,我们的注释可能如下所示
//word.Subsribers.Add(wordPub)
0001lhs.Subscribe() //wordPub.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //wordPub.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //wordPub.Subsribers.Add(0005lhsDuration)
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose() //wordPub.Subsribers.Remove(0003lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Banana 6
Run Code Online (Sandbox Code Playgroud)
所以在这个例子中,当word.OnNext("Banana");执行时,观察者链按此顺序链接
然而,wordPub有子订阅!所以真正的订阅列表看起来像
如果我们只注释主题日志,我们就会看到微妙之处
0001lhs.Subscribe() //word.Subsribers.Add(0001lhs)
0002rhs.Subscribe() //word.Subsribers.Add(0002rhs)
0001lhs.OnNext(Apple)
0003lhsDuration.Subscribe() //word.Subsribers.Add(0003lhsDuration)
0002rhs.OnNext(5)
0004rhsDuration.Subscribe()
0004rhsDuration.OnCompleted()
0004rhsDuration.Dispose()
OnNext
Apple 5
0001lhs.OnNext(Banana)
0005lhsDuration.Subscribe() //word.Subsribers.Add(0005lhsDuration)
0002rhs.OnNext(6)
0006rhsDuration.Subscribe()
0006rhsDuration.OnCompleted()
0006rhsDuration.Dispose()
OnNext
Apple 6
OnNext
Banana 6
0003lhsDuration.OnNext(Banana)
0003lhsDuration.Dispose()
Run Code Online (Sandbox Code Playgroud)
所以在这个例子中,当word.OnNext("Banana");执行时,观察者链按此顺序链接
1. 0001lhs
2. 0002rhs
3. 0003lhsDuration
4. 0005lhsDuration
Run Code Online (Sandbox Code Playgroud)
由于0003lhsDuration订阅是在 后激活的0002rhs,因此它不会看到“Banana”值来终止窗口,直到 rhs发送该值之后,从而在仍然打开的窗口中生成它。
呼呼
正如 @francezu13k50 指出的,解决您的问题的明显而简单的解决方案就是使用word.Select(x => new { Word = x, Length = x.Length });,但我认为您已经为我们提供了实际问题的简化版本(赞赏),我理解为什么这不合适。但是,由于我不知道您真正的问题空间是什么,所以我不确定向您建议什么来提供解决方案,除了您当前的代码有一个解决方案,现在您应该知道为什么它会这样工作。
| 归档时间: |
|
| 查看次数: |
148 次 |
| 最近记录: |