使用Parallel Linq Extensions结合两个序列,如何才能首先产生最快的结果?

Ale*_*ffe 9 .net c# parallel-processing parallel-extensions plinq

假设我有两个序列返回整数1到5.

第一个返回1,2和3非常快,但4和5每个需要200ms.

public static IEnumerable<int> FastFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i > 3) Thread.Sleep(200);
        yield return i;
    }
}
Run Code Online (Sandbox Code Playgroud)

第二个返回1,2和3,延迟时间为200ms,但快速返回4和5.

public static IEnumerable<int> SlowFirst()
{
    for (int i = 1; i < 6; i++)
    {
        if (i < 4) Thread.Sleep(200);
        yield return i;
    }
}
Run Code Online (Sandbox Code Playgroud)

联合这两个序列只给出数字1到5.

FastFirst().Union(SlowFirst());
Run Code Online (Sandbox Code Playgroud)

我不能保证两种方法中的哪一种在什么时候有延迟,所以执行的顺序不能保证为我提供解决方案.因此,我想将联盟并行化,以便最小化我的例子中的(人为的)延迟.

一个真实场景:我有一个返回一些实体的缓存,以及一个返回所有实体的数据源.我希望能够从一个方法返回一个迭代器,该方法将请求内部并行化到缓存和数据源,以便缓存的结果尽可能快地生成.

注1:我意识到这仍然在浪费CPU周期; 我不是问我怎么能阻止序列迭代它们的慢元素,我怎么能尽可能快地结合它们.

更新1:我已经定制了achitaka-san对接受多个生成器的响应,并使用ContinueWhenAll将BlockingCollection的CompleteAdding设置为一次.我只是把它放在这里,因为它会因缺少注释格式而丢失.任何进一步的反馈都会很棒!

public static IEnumerable<TResult> SelectAsync<TResult>(
    params IEnumerable<TResult>[] producer)
{
    var resultsQueue = new BlockingCollection<TResult>();

    var taskList = new HashSet<Task>();
    foreach (var result in producer)
    {
        taskList.Add(
            Task.Factory.StartNew(
                () =>
                    {
                        foreach (var product in result)
                        {
                            resultsQueue.Add(product);
                        }
                    }));
    }

    Task.Factory.ContinueWhenAll(taskList.ToArray(), x => resultsQueue.CompleteAdding());

    return resultsQueue.GetConsumingEnumerable();
}
Run Code Online (Sandbox Code Playgroud)

Geo*_*dze 3

看看这个。第一个方法只是按照结果的顺序返回所有内容。第二个检查唯一性。如果你把它们链接起来,我想你会得到你想要的结果。

public static class Class1
{
    public static IEnumerable<TResult> SelectAsync<TResult>(
        IEnumerable<TResult> producer1,
        IEnumerable<TResult> producer2,
        int capacity)
    {
        var resultsQueue = new BlockingCollection<TResult>(capacity);
        var producer1Done = false;
        var producer2Done = false;

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer1)
            {
                resultsQueue.Add(product);
            }
            producer1Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        Task.Factory.StartNew(() =>
        {
            foreach (var product in producer2)
            {
                resultsQueue.Add(product);
            }
            producer2Done = true;
            if (producer1Done && producer2Done) { resultsQueue.CompleteAdding(); }
        });

        return resultsQueue.GetConsumingEnumerable();
    }


    public static IEnumerable<TResult> SelectAsyncUnique<TResult>(this IEnumerable<TResult> source)
    {
        HashSet<TResult> knownResults = new HashSet<TResult>();
        foreach (TResult result in source)
        {
            if (knownResults.Contains(result)) {continue;}
            knownResults.Add(result);
            yield return result;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)