Observable中的通知并行性

fah*_*ash 4 c# multithreading system.reactive

我有以下观察

IObservable<Work> observable = SomeMethodToGetWorkToDo();
Run Code Online (Sandbox Code Playgroud)

每次OnNext调用上面的内容时,我都需要在一个单独的线程中完成工作.每项工作都需要很长时间才能完成,所以我不能让Work队列中的其他项目等待有足够的系统资源.

我认为ObserveOn可以解决我的问题,但是当我运行几个Console.WriteLine调用来查看线程ID时,我看到每个通知调用都有相同的线程ID.

我怎样才能确保并行性OnNext

Bra*_*don 5

您需要将"要做的工作"转换为一系列"工作正在完成"的令牌,然后再收集这些令牌.最简单的方法是使用TPL作为任务执行工作(最终将在线程池上运行任务).像这样的东西:

observable
    .SelectMany(work => Task.Run(() => DoWork(work))
    .Subscribe(workResult => Console.WriteLine("a work item was completed"));
Run Code Online (Sandbox Code Playgroud)

  • `SelectMany`将每个工作项转换为`Task <TResult>`,然后将每个任务解包为`TResult`,这样你最终得到一个`IObservable <TResult>`流.使用`Select`,您将拥有一个`IObservable <Task <TResult >>`流. (2认同)
  • 我建议使用`Observable.Start`而不是`Task.Run`,因为最好留在Rx中来回切换来自TPL. (2认同)