C# Rx Observable 产生随机结果

Ste*_*ric 1 c# system.reactive observable

考虑以下程序;

class Program
{
    static IObservable<int> GetNumbers()
    {
        var observable = Observable.Empty<int>();
        foreach (var i in Enumerable.Range(1, 10))
        {
            observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
            {
                Console.WriteLine($"Producing {i}");
                Thread.Sleep(1000);
                return i;
            })));
        }

        return observable;
    }

    static async Task LogNumbers(IObservable<int> observable)
    {
        var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
        await observable;
        subscription.Dispose();
    }

    static void Main(string[] args)
    {
        LogNumbers(GetNumbers()).Wait();
        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}
Run Code Online (Sandbox Code Playgroud)

它产生以下输出

Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3
Producing 4
Consuming 3
Producing 4
Producing 5
Consuming 4
Producing 5
Producing 6
Consuming 5
Producing 6
Producing 7
Consuming 6
Producing 7
Producing 8
Consuming 7
Producing 8
Producing 9
Consuming 8
Producing 9
Producing 10
Consuming 9
Producing 10
Finished
Run Code Online (Sandbox Code Playgroud)

它写出每条“生产 x”语句和一条“消耗 x”语句。为什么要这样做?为什么它从来没有写出预期的最终“Consuming 10”语句?

Gid*_*rth 5

您将获得两份生产线副本,因为您订阅了两次。最有可能的是,您没有获得消耗的 10,因为当第二个订阅结束时第一个订阅被取消。如果您有时确实得到 Consuming 10,我不会感到惊讶,只是因为当时任务以不同的顺序运行。

static async Task LogNumbers(IObservable<int> observable)
{
    //This is the first subscription
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));

    //This is the second subscription
    await observable;

    subscription.Dispose();
}
Run Code Online (Sandbox Code Playgroud)

按照函数的编写方式GetNumbers,对可观察对象的每个订阅都将触发其自己的 10 个任务集运行,从而触发其自己的输出集。第一个订阅还监视生成的值并输出消耗行。第二个订阅对生成的值没有任何作用,因为您没有使用await observable,但确实导致第二组任务运行。

您可以通过使用Publish().RefCount()LogNumbers 的参数或使用 TaskCompletionSource 并从当前在第一个订阅中未使用的 OnError 和 OnComplete 函数将其标记为完成来消除第二个订阅。这些看起来像这样:

static async Task LogNumbersWithRefCount(IObservable<int> observable)
{
    observable = observable.Publish().RefCount();
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
    await observable;
    subscription.Dispose();
}

static async Task LogNumbersTCS(IObservable<int> observable)
{
    var t = new TaskCompletionSource<object>()
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"),
                       ex => t.TrySetException(ex),
                       () => t.TrySetResult(null));
    return t.Task;
}
Run Code Online (Sandbox Code Playgroud)