ParallelQuery.Aggregate不能并行运行的可能原因

Szm*_*pie 5 .net c# parallel-processing multithreading plinq

我很感激PLYNQ专家的帮助!我会花时间回顾一下答案,我对math.SE有一个更为确定的概况.

我有一个类型的对象ParallelQuery<List<string>>,它有44个列表,我想并行处理(一次五个,比如说).我的流程有一个签名

private ProcessResult Process(List<string> input)
Run Code Online (Sandbox Code Playgroud)

处理将返回一个结果,这是一对布尔值,如下所示.

    private struct ProcessResult
    {
        public ProcessResult(bool initialised, bool successful)
        {
            ProcessInitialised = initialised;
            ProcessSuccessful = successful;
        }

        public bool ProcessInitialised { get; }
        public bool ProcessSuccessful { get; }
    }
Run Code Online (Sandbox Code Playgroud)

问题.给定一个IEnumerable<List<string>> processMe,我的PLYNQ查询尝试实现此方法:https://msdn.microsoft.com/en-us/library/dd384151(v = vs.110).aspx .它写成

processMe.AsParallel()
         .Aggregate<List<string>, ConcurrentStack<ProcessResult>, ProcessResult>
             (
                 new ConcurrentStack<ProcessResult>,   //aggregator seed
                 (agg, input) =>
                 {                         //updating the aggregate result
                     var res = Process(input);
                     agg.Push(res);
                     return agg;
                 },
                 agg => 
                 {                         //obtain the result from the aggregator agg
                     ProcessResult res;    // (in this case just the most recent result**)
                     agg.TryPop(out res);
                     return res;
                 }
             );
Run Code Online (Sandbox Code Playgroud)

不幸的是,它并不是按顺序运行的.(**请注意,这个实现没有"意义",我只是想让并行化现在起作用.)


我尝试了一个稍微不同的实现,它确实并行运行,但没有聚合.我定义了一个聚合方法(基本上是两个部分的布尔AND ProcessResult,即聚合([A1,A2],[B1,B2])≡[A1 && B1,A2 && B2]).

private static ProcessResult AggregateProcessResults
        (ProcessResult aggregate, ProcessResult latest)
    {
        bool ini = false, suc = false;
        if (aggregate.ProcessInitialised && latest.ProcessInitialised)
            ini = true;
        if (aggregate.ProcessSuccessful && latest.ProcessSuccessful)
            suc = true;


        return new ProcessResult(ini, suc);
    }
Run Code Online (Sandbox Code Playgroud)

并使用了PLYNQ查询https://msdn.microsoft.com/en-us/library/dd383667(v=vs.110).aspx

.Aggregate<List<string>, ProcessResult, ProcessResult>(
    new ProcessResult(true, true),
    (res, input)  => Process(input),
    (agg, latest) => AggregateProcessResults(agg, latest),
    agg           => agg
Run Code Online (Sandbox Code Playgroud)

这里的问题是AggregateProcessResults代码从来没有被打过,出于某种原因 - 我无能为力的结果......

感谢阅读,任何帮助赞赏:)

Evk*_*Evk 4

根据设计,您使用的过载Aggregate确实不会并行运行。您传递种子,然后传递阶跃函数,但阶跃函数 ( ) 的参数是从agg一步接收的累加器。因此,它本质上是顺序的(上一步的结果被输入到下一步)并且不可并行。不知道为什么这个重载包含在 中,但可能是有原因的。ParallelEnumerable

相反,使用另一个重载:

var result = processMe
.AsParallel()
.Aggregate
(
    // seed factory. Each partition will call this to get its own seed
    () => new ConcurrentStack<ProcessResult>(),
    // process element and update accumulator
    (agg, input) =>
    {                                           
        var res = Process(input);
        agg.Push(res);
        return agg;
    },
    // combine accumulators from different partitions
    (agg1, agg2) => {
        agg1.PushRange(agg2.ToArray());
        return agg1;
    },
    // reduce
    agg =>
    {
        ProcessResult res;
        agg.TryPop(out res);
        return res;
    }
);
Run Code Online (Sandbox Code Playgroud)