试图在长时间运行的发电机上使用PLINQ的陷阱?

Jim*_*mmy 6 c# plinq system.reactive

我有一些无限的生成器方法,包括一些长时间运行和无限长时间运行的生成器.

IEnumerable<T> ExampleOne() { 
    while(true) // this one blocks for a few seconds at a time
        yield return LongRunningFunction();
}
IEnumerable<T> ExampleTwo() { 
    while(true) //this one blocks for a really long time
        yield return OtherLongRunningFunction();
}
Run Code Online (Sandbox Code Playgroud)

我的目标是拥有一个无限序列,它结合了两个例子中的项目.这是我尝试过的,使用PLINQ:

 IEnumerable<T> combined =  new[] { ExampleOne(), ExampleTwo() }
           .AsParallel()
           .WithMergeOptions(ParallelMergeOptions.NotBuffered)
           .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
           .SelectMany(source => source.GetRequests());
Run Code Online (Sandbox Code Playgroud)

这似乎恰当地将两个IEnumerables组合成一个新的,IEnumerable只要#1和#2中的项目出现在两个源中的任何一个中,它们就可用IEnumerables:

//assuming ExampleTwo yields TWO but happens roughly 5 times 
//less often then ExampleOne
Example output:  one one one one one TWO one one one one one one TWO
Run Code Online (Sandbox Code Playgroud)

然而,似乎有时候(通常经过几个小时的运行)OtherLongRunningFunction()会长时间没有返回,并且在难以重现的条件下,combined序列将阻塞它而不是继续从第一个返回结果LongRunningFunction.看起来虽然组合的并行查询开始使用两个线程,但它决定稍后切换到一个线程.

我的第一个想法是"这可能是RX的工作Observable.Merge而不是PLINQ." 但我很欣赏这两个答案,它们显示了处理这种情况的正确替代方法,以及有关PLINQ如何在查询开始后几小时内改变并行度的机制的解释.

Ana*_*tts 2

这是 Rx 的方法,事实上,它确实使用了Merge

IObservable<T> LongRunningFunction()
{
    return Observable.Start(() => {
        // Calculate some stuff
        return blah;
    }, Scheduler.TaskPoolScheduler);
}

Observable.Merge(
    Observable.Defer(LongRunningFunction).Repeat(),
    Observable.Defer(OtherLongRunningFunction).Repeat(),
).Subscribe(x => {
    Console.WriteLine("An item: {0}", x);
});
Run Code Online (Sandbox Code Playgroud)