Rx TaskPoolScheduler vs EventLoopScheduler,内存使用情况

Ale*_*sas 9 reactive-programming system.reactive c#-4.0

我正在尝试对众多独立数据源进行POC.经典观察者风格应用的排序.数据馈送的数量可能从几百到几千不等,观察者的数量可能会有所不同,从2到20000.以下是简单数据馈送可观察模型的快速示例:

    public class FeedMockUp
    {
        private readonly IScheduler observerScheduler;
        private readonly Random rnd = new Random((int)DateTime.Now.Ticks);

        private readonly Subject<double> sourceObservable;
        private readonly IObservable<double> testedObservable;

        public FeedMockUp(IScheduler observerScheduler)
        {
            this.observerScheduler = observerScheduler;
            sourceObservable = new Subject<double>();

            testedObservable =
                Observable.Create<double>(x =>
                {
                    var underlyingSourceDisposable =
                        sourceObservable
                            .Subscribe(_ => x.OnNext(rnd.NextDouble()));
                    return underlyingSourceDisposable;
                });
        }

        public IDisposable SubscribeToUnderlyingFeed(int numberOfSubscribers)
        {
            int counter = 0;
            var disposable = new CompositeDisposable();

            for (int i = 0; i < numberOfSubscribers; i++)
            {
                disposable.Add(testedObservable
                    .ObserveOn(observerScheduler)
                    .Subscribe(_ => Interlocked.Increment(ref counter)));
            }

            return disposable;
        }

        public void PushNewFeed()
        {
            sourceObservable.OnNext(rnd.NextDouble());
        }
    }    
Run Code Online (Sandbox Code Playgroud)

虽然为了提高可观察量的更新吞吐量,我正在和玩家一起玩,但我注意到虽然使用EventLoopScheduler具有1000个观察者的100个数据馈送的应用程序的内存消耗是相当稳定的,但对于1000个观察者来说它是~100Mb并且当线性增长时添加新的观察者.

然而,当我尝试使用TaskPoolScheduler时,在x86进程中我开始获得OutOfMemoryException异常,并且在x64进程内存消耗爆炸,或者更确切地说,变得非常不确定,范围从1Gb到2Gb,仅500名观察者,并且随着新观察者的增长几乎呈指数级增长混合.

这是我用于测试的代码.你能看出它有什么问题吗?为什么性能如此差异?猜猜,这里有一些内部复制/排队,但这只是我的猜测.理想情况下,我想知道这里发生了什么,而没有潜入RX代码库......

    private static void Main(string[] args)
    {
        const int displayItemCount = 100;
        const int callbackCount = 500;

        //var rtScheduler = new EventLoopScheduler(); 
        var rtScheduler = TaskPoolScheduler.Default;
        var rtFeeds = new List<FeedMockUp>();
        for (int i = 0; i < displayItemCount; i++)
        {
            var mockFeed = new FeedMockUp(rtScheduler);
            mockFeed.SubscribeToUnderlyingFeed(callbackCount);
            rtFeeds.Add(mockFeed);
        }
        foreach (var rtFeedMockUp in rtFeeds)
        {
            rtFeedMockUp.PushNewFeed();
        }
        Console.WriteLine("Memory used for feed {0} mockups with {1} observers / callbacks. Memory {2} Mb",
            displayItemCount, callbackCount, Environment.WorkingSet / (1024 * 1024));
Console.ReadKey();

}
Run Code Online (Sandbox Code Playgroud)

Bra*_*don 15

使用ObserveOnwith TaskPoolScheduler本质上是为每个观察者创建一个LongRunning任务.

默认情况下,TaskScheduler最终会为每个任务创建一个ThreadLongRunning.

每个线程的堆栈大约保留1MB.

因此,500名观察员使用该TaskPoolScheduler预留至少500MB.你可以看到这是怎么回事......

EventLoopScheduler,而另一方面,运行在一个单独的线程.因此ObserveOn,有效地使用此调度程序只需在调度程序的工作队列中添加一个条目.此条目比Thread的1MB成本小得多.

因此,EventLoopScheduler对于这种情况来说,它的内存效率要高得多,但它也会连续地通知观察者,如果有很多观察者并且源生成频率很高,那么您将开始积累未发送事件的缓冲区.

TaskPoolScheduler是更少的内存效率,但同时会通知观察者,因此可以潜在地处理比频率较高的事件EventLoopScheduler,利用所有内核的机器上.

  • 您还可以为每个内核创建一个“EventLoopScheduler”并在入口点上实现负载均衡器;例如,循环调度程序选择。这可以包装在它自己的“IScheduler”实现中,这样你只需要传递一个引用。 (2认同)

Tia*_*ago 8

你可能想用TaskPoolScheduler.Default.DisableOptimizations(typeof(ISchedulerLongRunning)).如果你不介意失去并行性,EventLoopScheduler是一个很好的选择.

如果您仍希望并行执行工作但希望使用线程池线程,则此选项更可取.