Jim*_*mmy 6 c# plinq system.reactive
我有一些无限的生成器方法,包括一些长时间运行和无限长时间运行的生成器.
IEnumerable<T> ExampleOne() {
while(true) // this one blocks for a few seconds at a time
yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() {
while(true) //this one blocks for a really long time
yield return OtherLongRunningFunction();
}
Run Code Online (Sandbox Code Playgroud)
我的目标是拥有一个无限序列,它结合了两个例子中的项目.这是我尝试过的,使用PLINQ:
IEnumerable<T> combined = new[] { ExampleOne(), ExampleTwo() }
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.SelectMany(source => source.GetRequests());
Run Code Online (Sandbox Code Playgroud)
这似乎恰当地将两个IEnumerables组合成一个新的,IEnumerable只要#1和#2中的项目出现在两个源中的任何一个中,它们就可用IEnumerables:
//assuming ExampleTwo yields TWO but happens roughly 5 times
//less often then ExampleOne
Example output: one one one one one TWO one one one one one one TWO
Run Code Online (Sandbox Code Playgroud)
然而,似乎有时候(通常经过几个小时的运行)OtherLongRunningFunction()会长时间没有返回,并且在难以重现的条件下,combined序列将阻塞它而不是继续从第一个返回结果LongRunningFunction.看起来虽然组合的并行查询开始使用两个线程,但它决定稍后切换到一个线程.
我的第一个想法是"这可能是RX的工作Observable.Merge而不是PLINQ." 但我很欣赏这两个答案,它们显示了处理这种情况的正确替代方法,以及有关PLINQ如何在查询开始后几小时内改变并行度的机制的解释.
这是 Rx 的方法,事实上,它确实使用了Merge:
IObservable<T> LongRunningFunction()
{
return Observable.Start(() => {
// Calculate some stuff
return blah;
}, Scheduler.TaskPoolScheduler);
}
Observable.Merge(
Observable.Defer(LongRunningFunction).Repeat(),
Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
Console.WriteLine("An item: {0}", x);
});
Run Code Online (Sandbox Code Playgroud)