为什么 IEnumerable.ToObservable 这么慢?

The*_*ias 10 c# system.reactive rx.net

我想列举一个大IEnumerable过一次,并观察附有各运营商(枚举CountSumAverage等)。显而易见的方法是IObservable使用 method将其转换为 an ToObservable,然后为它订阅观察者。我注意到这比其他方法慢得多,比如做一个简单的循环并在每次迭代时通知观察者,或者使用Observable.Create方法而不是ToObservable. 差异很大:它慢了 20-30 倍。就是这样,还是我做错了什么?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}
Run Code Online (Sandbox Code Playgroud)

输出:

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}
Run Code Online (Sandbox Code Playgroud)

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Windows 10, Console App, Release built


更新:这是我想要实现的实际功能的示例:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");
Run Code Online (Sandbox Code Playgroud)

输出:

计数:10,000,000,总和:49,999,995,000,000,平均:4,999,999.5

与使用标准LINQ运算符相比,此方法的重要区别在于源可枚举项仅被枚举一次。


另一项观察: usingToObservable(Scheduler.Immediate)ToObservable().

Eni*_*ity 6

这就是表现良好的 observable 和“滚动你自己的,因为你认为更快,更好,但它不是”可观察之间的区别。

当你深入到源代码中时,你会发现这条可爱的小线:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));
Run Code Online (Sandbox Code Playgroud)

hasNext = enumerator.MoveNext();每个预定的递归迭代有效地调用一次。

这允许您为您的.ToObservable(schedulerOfYourChoice)呼叫选择调度程序。

使用您选择的其他选项,您创建了一系列.OnNext几乎没有任何作用的直接调用。Method2甚至没有.Subscribe电话。

这两个Method2Method1认购完成之前,使用当前线程都运行到完成运行。他们正在阻止呼叫。它们会导致竞争条件。

Method1是唯一一个表现良好的可观察对象。它是异步的,可以独立于订阅者运行。

请记住,可观察对象是随时间运行的集合。它们通常具有异步源或计时器或对外部刺激的响应。他们不会经常跑掉一个简单的可枚举。如果您正在使用可枚举,那么同步工作应该会运行得更快。

速度不是 Rx 的目标。目标是对基于时间的推送值执行复杂查询。

  • “自己动手,因为你认为更快更好,但事实并非如此”——太棒了! (2认同)