使用Reactive Extensions进行异步队列处理

Jef*_*eff 7 .net system.reactive

有几篇关于此的文章,我有这个工作...但我想知道如何为我的Observable订阅一次设置最大数量的任务线程.

我有以下内容来并行化异步保存日志条目:

private BlockingCollection<ILogEntry> logEntryQueue;
Run Code Online (Sandbox Code Playgroud)

 logEntryQueue = new BlockingCollection<ILogEntry>();
 logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry);
Run Code Online (Sandbox Code Playgroud)

要安排我的保存...但是如何指定调度程序一次使用的最大线程数?

And*_*mes 9

这不是Observable的功能,而是Scheduler的功能.Observable定义了什么,调度程序定义了哪里.

您需要传入自定义调度程序.执行此操作的一种简单方法是将TaskScheduler子类化并覆盖"MaximumConcurrencyLevel"属性.

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

我实际上在MSDN上找到了这个样本:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

编辑:您询问了如何从TaskScheduler转到IScheduler.另一位开发人员给了我一些信息:

var ischedulerForRx = new TaskPoolScheduler
(
    new TaskFactory
    (
        //This is your custom scheduler
        new LimitedConcurrencyLevelTaskScheduler(1)
    )
);
Run Code Online (Sandbox Code Playgroud)