fah*_*ash 4 c# multithreading system.reactive
我有以下观察
IObservable<Work> observable = SomeMethodToGetWorkToDo();
Run Code Online (Sandbox Code Playgroud)
每次OnNext调用上面的内容时,我都需要在一个单独的线程中完成工作.每项工作都需要很长时间才能完成,所以我不能让Work队列中的其他项目等待有足够的系统资源.
我认为ObserveOn可以解决我的问题,但是当我运行几个Console.WriteLine调用来查看线程ID时,我看到每个通知调用都有相同的线程ID.
我怎样才能确保并行性OnNext?
您需要将"要做的工作"转换为一系列"工作正在完成"的令牌,然后再收集这些令牌.最简单的方法是使用TPL作为任务执行工作(最终将在线程池上运行任务).像这样的东西:
observable
.SelectMany(work => Task.Run(() => DoWork(work))
.Subscribe(workResult => Console.WriteLine("a work item was completed"));
Run Code Online (Sandbox Code Playgroud)