Ale*_*sas 9 reactive-programming system.reactive c#-4.0
我正在尝试对众多独立数据源进行POC.经典观察者风格应用的排序.数据馈送的数量可能从几百到几千不等,观察者的数量可能会有所不同,从2到20000.以下是简单数据馈送可观察模型的快速示例:
public class FeedMockUp
{
private readonly IScheduler observerScheduler;
private readonly Random rnd = new Random((int)DateTime.Now.Ticks);
private readonly Subject<double> sourceObservable;
private readonly IObservable<double> testedObservable;
public FeedMockUp(IScheduler observerScheduler)
{
this.observerScheduler = observerScheduler;
sourceObservable = new Subject<double>();
testedObservable =
Observable.Create<double>(x =>
{
var underlyingSourceDisposable =
sourceObservable
.Subscribe(_ => x.OnNext(rnd.NextDouble()));
return underlyingSourceDisposable;
});
}
public IDisposable SubscribeToUnderlyingFeed(int numberOfSubscribers)
{
int counter = 0;
var disposable = new CompositeDisposable();
for (int i = 0; i < numberOfSubscribers; i++)
{
disposable.Add(testedObservable
.ObserveOn(observerScheduler)
.Subscribe(_ => Interlocked.Increment(ref counter)));
}
return disposable;
}
public void PushNewFeed()
{
sourceObservable.OnNext(rnd.NextDouble());
}
}
Run Code Online (Sandbox Code Playgroud)
虽然为了提高可观察量的更新吞吐量,我正在和玩家一起玩,但我注意到虽然使用EventLoopScheduler具有1000个观察者的100个数据馈送的应用程序的内存消耗是相当稳定的,但对于1000个观察者来说它是~100Mb并且当线性增长时添加新的观察者.
然而,当我尝试使用TaskPoolScheduler时,在x86进程中我开始获得OutOfMemoryException异常,并且在x64进程内存消耗爆炸,或者更确切地说,变得非常不确定,范围从1Gb到2Gb,仅500名观察者,并且随着新观察者的增长几乎呈指数级增长混合.
这是我用于测试的代码.你能看出它有什么问题吗?为什么性能如此差异?猜猜,这里有一些内部复制/排队,但这只是我的猜测.理想情况下,我想知道这里发生了什么,而没有潜入RX代码库......
private static void Main(string[] args)
{
const int displayItemCount = 100;
const int callbackCount = 500;
//var rtScheduler = new EventLoopScheduler();
var rtScheduler = TaskPoolScheduler.Default;
var rtFeeds = new List<FeedMockUp>();
for (int i = 0; i < displayItemCount; i++)
{
var mockFeed = new FeedMockUp(rtScheduler);
mockFeed.SubscribeToUnderlyingFeed(callbackCount);
rtFeeds.Add(mockFeed);
}
foreach (var rtFeedMockUp in rtFeeds)
{
rtFeedMockUp.PushNewFeed();
}
Console.WriteLine("Memory used for feed {0} mockups with {1} observers / callbacks. Memory {2} Mb",
displayItemCount, callbackCount, Environment.WorkingSet / (1024 * 1024));
Console.ReadKey();
}
Run Code Online (Sandbox Code Playgroud)
Bra*_*don 15
使用ObserveOnwith TaskPoolScheduler本质上是为每个观察者创建一个LongRunning任务.
默认情况下,TaskScheduler最终会为每个任务创建一个ThreadLongRunning.
每个线程的堆栈大约保留1MB.
因此,500名观察员使用该TaskPoolScheduler预留至少500MB.你可以看到这是怎么回事......
的EventLoopScheduler,而另一方面,运行在一个单独的线程.因此ObserveOn,有效地使用此调度程序只需在调度程序的工作队列中添加一个条目.此条目比Thread的1MB成本小得多.
因此,EventLoopScheduler对于这种情况来说,它的内存效率要高得多,但它也会连续地通知观察者,如果有很多观察者并且源生成频率很高,那么您将开始积累未发送事件的缓冲区.
的TaskPoolScheduler是更少的内存效率,但同时会通知观察者,因此可以潜在地处理比频率较高的事件EventLoopScheduler,利用所有内核的机器上.
你可能想用TaskPoolScheduler.Default.DisableOptimizations(typeof(ISchedulerLongRunning)).如果你不介意失去并行性,EventLoopScheduler是一个很好的选择.
如果您仍希望并行执行工作但希望使用线程池线程,则此选项更可取.
| 归档时间: |
|
| 查看次数: |
3699 次 |
| 最近记录: |