并行Linq查询优化

dav*_*k01 15 c# linq parallel-processing plinq c#-4.0

一段时间以来,我一直围绕没有副作用的方法构建我的代码,以便使用并行linq来加快速度.一路走来,我不止一次偶然发现懒惰的评估让事情变得更糟而不是更好,我想知道是否有任何工具可以帮助优化并行linq查询.

我问,因为我最近通过修改某些方法并AsParallel在某些关键位置加油来重构一些令人尴尬的并行代码.运行时间从2分钟下降到45秒,但从性能监视器可以清楚地看出,有些地方CPU上的所有内核都没有得到充分利用.在一些错误的启动后,我强制执行一些查询,ToArray并且运行时间进一步降低到16秒.减少代码的运行时间感觉很好,但它也有点令人不安,因为不清楚需要强制使用代码查询的位置ToArray.等到查询执行的最后一分钟不是最佳策略,但是根本不清楚代码中的哪些点需要强制某些子查询才能利用所有CPU内核.

因为它是我不知道如何正确胡椒ToArray或其他方法迫使linq计算执行,以获得最大的CPU利用率.那么优化并行linq查询是否有任何通用指南和工具?

这是一个伪代码示例:

var firstQuery = someDictionary.SelectMany(FirstTransformation);
var secondQuery = firstQuery.Select(SecondTransformation);
var thirdQuery = secondQuery.Select(ThirdTransformation).Where(SomeConditionCheck);
var finalQuery = thirdQuery.Select(FinalTransformation).Where(x => x != null);
Run Code Online (Sandbox Code Playgroud)

FirstTransformation,SecondTransformation,ThirdTransformation都是CPU绑定,并且在复杂性方面,他们有几个3x3矩阵乘法和一些if分支机构.SomeConditionCheck几乎是一张null支票.FinalTransformation是代码中CPU密集度最高的部分,因为它将执行一大堆线平面交叉,并检查这些交叉点的​​多边形包含,然后提取最接近线上某个点的交点.

我不知道为什么我放置的地方AsParallel减少了代码的运行时间.我现在已经达到了运行时间的局部最小值,但我不知道为什么.我偶然发现它只是运气不好.如果你想知道放置的地方AsParallel是第一行和最后一行.放在AsParallel其他地方只会增加运行时间,有时甚至会增加20秒.ToArray第一行还有隐藏的藏身之处.

Joe*_*men 16

这里有几件事情:

  1. PLINQ比不计数的IEnumerables更有效地并行化集合.如果您有一个数组,它会将数组长度除以CPU核心数并将它们均匀地分配出来.但是如果你有一个长度未知的IEnumerable,它会做一个愚蠢的指数斜升类型的事情,其中​​任务将一次处理1,2,4,8等元素,直到它到达IEnumerable的末尾.
  2. 通过并行化所有查询,您可以将工作分解为很小的块.如果跨N个元素有M个并行查询,则最终会得到M*N个任务.这里有更多的线程开销,比如你只是并行化最后一个查询,在这种情况下,你最终只有N个任务.
  3. 当每个任务花费大致相同的处理时间时,PLINQ的效果最佳.这样它可以在核心之间平均分配它们.通过并行化具有不同性能行为的每个查询,您有M*N个任务需要不同的时间,PLINQ无法以最佳方式安排它们(因为它不能提前知道每个可能需要多长时间).

所以这里的总体指导原则是:确保在开始之前,如果可能的话,你有一个数组,并且在评估之前只将AsParallel放在最后一个查询上.所以像下面这样的东西应该很好用:

var firstQuery = someDictionary.SelectMany().ToArray().Select(FirstTransformation);
var secondQuery = firstQuery.Select(SecondTransformation);
var thirdQuery = secondQuery.Select(ThirdTransformation).AsParallel().Where(SomeConditionCheck).ToArray();
var finalQuery = thirdQuery.Select(FinalTransformation).AsParallel().Where(x => x != null);
Run Code Online (Sandbox Code Playgroud)


Alo*_*aus 7

没有看到实际的代码几乎是不可能的.但作为一般准则,您应该考虑在复数数字运算期间避免P/LINQ,因为委托和IEnumerable开销太高.使用线程获得的速度很可能被LINQ提供的方便抽象所吞噬.

这里有一些代码可以计算2个整数列表的总和,对某些int进行浮点数比较,然后计算它的cos.使用LINQ .Zip运算符可以很好地完成基本的东西......或者使用for循环的老式方法.

在我的Haswell 8核心机器上使用更新的ParallelLinq更新1

  • Linq 0,95s
  • Linq Parallel 0,19s
  • 优化0,45s
  • 优化并行0,08s

更新1结束

  • LINQ 1,65s
  • 优化0,64s
  • 优化的并行0,40s

由于IEnumerable懒惰和方法调用开销(我确实使用了发布模式x32 Windows 7,.NET 4双核),时间差几乎是因子3.我试图在LINQ版本中使用AsParallel,但实际上它实际上变慢了(2,3s).如果您是数据驱动的,您应该使用Parallel.For构造来获得良好的可扩展性.IEnumerable本身就是并行化的不良候选者

  • 在你进行枚举直到结束之前,你不知道你有多少工作.
  • 您不能进行急切的分块,因为您不知道IEnumerable将返回下一个元素的速度有多快(可能是Web服务调用或数组索引访问).

下面是一个代码示例来说明这一点.如果你想对裸机进行更多优化,首先需要摆脱每件物品成本太高的抽象.与非内联的MoveNext()和Current方法调用相比,数组访问要便宜得多.

    class Program
    {
        static void Main(string[] args)
        {
            var A = new List<int>(Enumerable.Range(0, 10*1000*1000));
            var B = new List<int>(Enumerable.Range(0, 10*1000*1000));

            double[] Actual = UseLinq(A, B);
            double[] pActual = UseLinqParallel(A, B);

            var other = Optimized(A, B);
            var pother = OptimizedParallel(A, B);
        }

        private static double[] UseLinq(List<int> A, List<int> B)
        {
            var sw = Stopwatch.StartNew();
            var Merged = A.Zip(B, (a, b) => a + b);
            var Converted = A.Select(a => (float)a);

            var Result = Merged.Zip(Converted, (m, c) => Math.Cos((double)m / ((double)c + 1)));

            double[] Actual = Result.ToArray();
            sw.Stop();

            Console.WriteLine("Linq {0:F2}s", sw.Elapsed.TotalSeconds);
            return Actual;
        }

    private static double[] UseLinqParallel(List<int> A, List<int> B)
    {
        var sw = Stopwatch.StartNew();
        var x = A.AsParallel();
        var y = B.AsParallel();

        var Merged = x.Zip(y, (a, b) => a + b);
        var Converted = x.Select(a => (float)a);

        var Result = Merged.Zip(Converted, (m, c) => Math.Cos((double)m / ((double)c + 1)));

        double[] Actual = Result.ToArray();
        sw.Stop();

        Console.WriteLine("Linq Parallel {0:F2}s", sw.Elapsed.TotalSeconds);
        return Actual;
    }        

        private static double[] OptimizedParallel(List<int> A, List<int> B)
        {
            double[] result = new double[A.Count];
            var sw = Stopwatch.StartNew();
            Parallel.For(0, A.Count, (i) =>
            {
                var sum = A[i] + B[i];
                result[i] = Math.Cos((double)sum / ((double)((float)A[i]) + 1));
            });
            sw.Stop();

            Console.WriteLine("Optimized Parallel {0:F2}s", sw.Elapsed.TotalSeconds);
            return result;
        }

        private static double[] Optimized(List<int> A, List<int> B)
        {
            double[] result = new double[A.Count];
            var sw = Stopwatch.StartNew();
            for(int i=0;i<A.Count;i++)
            {
                var sum = A[i] + B[i];
                result[i] = Math.Cos((double)sum / ((double)((float)A[i]) + 1));
            }
            sw.Stop();

            Console.WriteLine("Optimized {0:F2}s", sw.Elapsed.TotalSeconds);
            return result;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)