反应性扩展似乎非常缓慢 - 我做错了什么?

Chr*_*ebb 13 .net c# system.reactive

我正在评估Rx是否需要每秒处理数千条消息的交易平台项目.现有平台具有复杂的事件路由系统(多播委托),它响应这些消息并执行大量后续处理.

我已经看过Reactive Extensions的显而易见的好处,但发现它有点慢,通常慢100倍.

我已经创建了单元测试来演示这个,它运行一个简单的增量100万次,使用各种Rx风格和直接开箱即用的委托"控制"测试.

结果如下:

Delegate                                 - (1000000) - 00:00:00.0410000
Observable.Range()                       - (1000000) - 00:00:04.8760000
Subject.Subscribe() - NewThread          - (1000000) - 00:00:02.7630000
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:03.0280000
Subject.Subscribe() - Immediate          - (1000000) - 00:00:03.0030000
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:02.9800000
Subject.Subscribe() - Dispatcher         - (1000000) - 00:00:03.0360000
Run Code Online (Sandbox Code Playgroud)

如您所见,所有Rx方法都比委托等效方法慢约100倍.很显然,Rx在一个更复杂的例子中会有很多用处,但这看起来非常慢.

这是正常的还是我的测试假设无效?下面的Nunit代码 -

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using NUnit.Framework;
using System.Concurrency;

namespace RxTests
{
    [TestFixture]
    class ReactiveExtensionsBenchmark_Tests
    {
        private int counter = 0;

        [Test]
        public void ReactiveExtensionsPerformanceComparisons()
        {
            int iterations = 1000000;

            Action<int> a = (i) => { counter++; };

            DelegateSmokeTest(iterations, a);
            ObservableRangeTest(iterations, a);
            SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread");
            SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread");
            SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate");
            SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool");
            SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher");
        }

        public void ObservableRangeTest(int iterations, Action<int> action)
        {
            counter = 0;

            long start = DateTime.Now.Ticks;

            Observable.Range(0, iterations).Subscribe(action);

            OutputTestDuration("Observable.Range()", start);
        }


        public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
        {
            counter = 0;

            var eventSubject = new Subject<int>();
            var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb
            events.Subscribe(action);

            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => eventSubject.OnNext(1)
                );

            OutputTestDuration("Subject.Subscribe() - " + mode, start);
        }

        public void DelegateSmokeTest(int iterations, Action<int> action)
        {
            counter = 0;
            long start = DateTime.Now.Ticks;

            Enumerable.Range(0, iterations).ToList().ForEach
                (
                    a => action(1)
                );

            OutputTestDuration("Delegate", start);
        }


        /// <summary>
        /// Output helper
        /// </summary>
        /// <param name="test"></param>
        /// <param name="duration"></param>
        public void OutputTestDuration(string test, long duration)
        {
            Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration)));
        }

        /// <summary>
        /// Test timing helper
        /// </summary>
        /// <param name="elapsedTicks"></param>
        /// <returns></returns>
        public string ElapsedDuration(long elapsedTicks)
        {
            return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString();
        }

    }
}
Run Code Online (Sandbox Code Playgroud)

dtb*_*dtb 17

我的猜测是,Rx团队专注于首先构建功能,而不关心性能优化.

使用分析器确定瓶颈并使用您自己的优化版本替换慢速Rx类.

以下是两个例子.

结果:

Delegate                                 - (1000000) - 00:00:00.0368748

Simple - NewThread                       - (1000000) - 00:00:00.0207676
Simple - CurrentThread                   - (1000000) - 00:00:00.0214599
Simple - Immediate                       - (1000000) - 00:00:00.0162026
Simple - ThreadPool                      - (1000000) - 00:00:00.0169848

FastSubject.Subscribe() - NewThread      - (1000000) - 00:00:00.0588149
FastSubject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0508842
FastSubject.Subscribe() - Immediate      - (1000000) - 00:00:00.0513911
FastSubject.Subscribe() - ThreadPool     - (1000000) - 00:00:00.0529137

首先,似乎重要的是如何实现可观察性.这是一个无法取消订阅的观察,但速度很快:

private IObservable<int> CreateFastObservable(int iterations)
{
    return Observable.Create<int>(observer =>
    {
        new Thread(_ =>
        {
            for (int i = 0; i < iterations; i++)
            {
                observer.OnNext(i);
            }
            observer.OnCompleted();
        }).Start();
        return () => { };
    });
}
Run Code Online (Sandbox Code Playgroud)

测试:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = CreateFastObservable(iterations);

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("Simple - " + mode, start);
}
Run Code Online (Sandbox Code Playgroud)

主题增加了很多开销.这是一个主题,它被剥夺了主题所需的大部分功能,但速度很快:

class FastSubject<T> : ISubject<T>
{
    private event Action onCompleted;
    private event Action<Exception> onError;
    private event Action<T> onNext;

    public FastSubject()
    {
        onCompleted += () => { };
        onError += error => { };
        onNext += value => { };
    }

    public void OnCompleted()
    {
        this.onCompleted();
    }

    public void OnError(Exception error)
    {
        this.onError(error);
    }

    public void OnNext(T value)
    {
        this.onNext(value);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        this.onCompleted += observer.OnCompleted;
        this.onError += observer.OnError;
        this.onNext += observer.OnNext;

        return Disposable.Create(() =>
        {
            this.onCompleted -= observer.OnCompleted;
            this.onError -= observer.OnError;
            this.onNext -= observer.OnNext;
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

测试:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode)
{
    counter = 0;

    var start = Stopwatch.StartNew();

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount();

    observable.SubscribeOn(scheduler).Run(action);

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start);
}
Run Code Online (Sandbox Code Playgroud)

  • Observable.Range有一个重载,调度程序作为参数,所以,这为我返回(使用Scheduler.Immediate)00:00:00.6698080:public void SimpleObserveTest(int iterations,Action <int> action,IScheduler scheduler,string mode ){counter = 0; var start = Stopwatch.StartNew(); Observable.Range(0,iterations,scheduler).Run(action); OutputTestDuration("简单 - "+模式,开始); } (2认同)

hko*_*hko 12

更新Rx 2.0:我从原始帖子中获取了代码(几乎)最新的Linqpad beta 4.42.04(好吧有06,但无论如何): Rx主要组件

...并稍微调整一下以使用新的Rx v2调度程序语法:

        public void ReactiveExtensionsPerformanceComparisons()
    {
        int iterations = 1000000;

        Action<int> a = (i) => { counter++; };

        DelegateSmokeTest(iterations, a);
        ObservableRangeTest(iterations, a);
        SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread");
        SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread");
        SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate");
        SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool");
        // I *think* this is the same as the ThreadPool scheduler in my case
        SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");                
        // doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete
        //SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool");
    }
Run Code Online (Sandbox Code Playgroud)

注意:结果差别很大,在极少数情况下,Threadpool会击败newThread,但在大多数情况下,NewThread在列表中位于其下方的调度程序之上略有优势:

Delegate                                 - (1000000) - 00:00:00.0440025
Observable.Range()                       - (1000000) - 00:00:01.9251101
Subject.Subscribe() - NewThread          - (1000000) - 00:00:00.0400023
Subject.Subscribe() - CurrentThread      - (1000000) - 00:00:00.0530030
Subject.Subscribe() - Immediate          - (1000000) - 00:00:00.0490028
Subject.Subscribe() - ThreadPool         - (1000000) - 00:00:00.0490028
Subject.Subscribe() - Default            - (1000000) - 00:00:00.0480028
Run Code Online (Sandbox Code Playgroud)

所以看起来他们的表现非常努力.


Ana*_*tts 10

请记住,您的Delegate不保证任何线程安全 - 它从调用它的任何线程调用委托,而当您调用Observable.ObserveOn将通知发送到其他线程时,Rx.NET必须执行锁定以确保它执行你认为它做什么.

因此,代表们可能会超级快速地移动,但是如果你想用它来构建一些实用的东西,你将最终建立同步手动,这将减慢你的速度.话虽这么说,Rx,就像LINQ一样,是一种抽象 - 如果你需要它快得离谱,你必须开始编写丑陋的代码.