Parallel ForEach的本地初始化如何工作?

Lui*_*rao 13 c# task-parallel-library

我不确定在Parallel.ForEach中使用本地init函数,如msdn文章中所述:http://msdn.microsoft.com/en-us/library/dd997393.aspx

Parallel.ForEach<int, long>(nums, // source collection
   () => 0, // method to initialize the local variable
   (j, loop, subtotal) => // method invoked by the loop on each iteration
   {
      subtotal += nums[j]; //modify local variable 
      return subtotal; // value to be passed to next iteration
   },...
Run Code Online (Sandbox Code Playgroud)

()=> 0如何初始化任何东西?变量的名称是什么,如何在循环逻辑中使用它?

Stu*_*tLC 27

参照以下过载的的Parallel.ForEach静态扩展方法:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource,?ParallelLoopState,?TLocal,?TLocal> taskBody,
    Action<TLocal> localFinally
)
Run Code Online (Sandbox Code Playgroud)

在您的具体示例中

这条线:

() => 0, // method to initialize the local variable
Run Code Online (Sandbox Code Playgroud)

只是一个lambda(匿名函数),它将返回常量整数零.这个lambda作为localInit参数传递给Parallel.ForEach- 由于lambda返回一个整数,它的类型Func<int>和类型TLocal可以int由编译器推断出来(类似地,TSource可以从作为参数传递的集合的类型推断出来source)

然后将返回值(0)作为第3个参数(命名subtotal)传递给taskBody Func.这个(0)用于身体循环的初始种子:

(j, loop, subtotal) =>
{
    subtotal += nums[j]; //modify local variable (Bad idea, see comment)
    return subtotal;     // value to be passed to next iteration
}
Run Code Online (Sandbox Code Playgroud)

第二个lambda(传递给taskBody)称为N次,其中N是TPL分区器分配给该任务的项目数.

对第二个taskBodylambda的每次后续调用都将传递此任务的新值subTotal,有效地计算运行的部分总数.在添加了分配给此任务的所有项目之后,localFinally将再次调用第三个和最后一个函数参数,并传递subtotal返回的最终值taskBody.由于几个此类任务将并行运行,因此还需要最后一步将所有部分总计加到最终的"总计"总数中.但是,因为多个并发任务(在不同的线程上)可以争用grandTotal变量,所以对它的更改以线程安全的方式完成是很重要的.

(我已经更改了MSDN变量的名称以使其更清晰)

long grandTotal = 0;
Parallel.ForEach(nums,            // source collection
  () => 0,                        // method to initialize the local variable
  (j, loop, subtotal) =>          // method invoked by the loop on each iteration
     subtotal + nums[j],          // value to be passed to next iteration subtotal
  // The final value of subtotal is passed to the localFinally function parameter
  (subtotal) => Interlocked.Add(ref grandTotal, subtotal)
Run Code Online (Sandbox Code Playgroud)

在MS示例中,修改任务主体内部的参数小计是一种不好的做法,而且是不必要的.即代码subtotal += nums[j]; return subtotal;会更好,因为return subtotal + nums[j];它可以缩写为lambda速记投影(j, loop, subtotal) => subtotal + nums[j]

一般来说

Parallel.For/Parallel.ForEachlocalInit / body / localFinally重载允许在任务执行迭代之前和之后(分别)运行每次任务初始化和清理代码.taskBody

(注意到For range/Enumerable传递给parallel For/ Foreach将被分成批次IEnumerable<>,每个都将被分配一个Task)

每个任务中,localInit将被调用一次,body代码将被重复调用,批处理(0..N次)每个项目一次,并localFinally在完成后调用一次.

此外,您可以通过- 我在下面调用此变量的通用返回值传递任务持续时间所需的任何状态(即到taskBodylocalFinally委托).TLocallocalInit FunctaskLocals

"localInit"的常见用法:

  • 创建和初始化循环体所需的昂贵资源,如数据库连接或Web服务连接.
  • 保持任务 - 本地变量保持(无竞争)运行总计或集合
  • 如果你需要将多个对象返回localInittaskBodylocalFinally,你可以使用强类型的类,Tuple<,,>或者,如果你只使用lambdas localInit / taskBody / localFinally,你也可以通过匿名类传递数据.请注意,如果您使用return from localInit在多个任务之间共享引用类型,则需要考虑此对象的线程安全性 - 不可变性更可取.

"localFinally"行动的常见用法:

  • 为了释放资源,例如IDisposables在使用taskLocals(比如数据库连接,文件处理,Web服务客户端等)
  • 将每个任务完成的工作聚合/组合/减少回共享变量.这些共享变量将被争用,因此线程安全性是一个问题:
    • 例如Interlocked.Increment在整数等基本类型上
    • lock 写操作需要或类似
    • 利用并发集合来节省时间和精力.

taskBodytight循环操作的一部分 - 您需要优化它以提高性能.

这是用评论的例子总结的最好的总结:

public void MyParallelizedMethod()
{
    // Shared variable. Not thread safe
    var itemCount = 0; 

    Parallel.For(myEnumerable, 
    // localInit - called once per Task.
    () => 
    {
       // Local `task` variables have no contention 
       // since each Task can never run by multiple threads concurrently
       var sqlConnection = new SqlConnection("connstring...");
       sqlConnection.Open();

       // This is the `task local` state we wish to carry for the duration of the task
       return new 
       { 
          Conn = sqlConnection,
          RunningTotal = 0
       }
    },
    // Task Body. Invoked once per item in the batch assigned to this task
    (item, loopState, taskLocals) =>
    {
      // ... Do some fancy Sql work here on our task's independent connection
      using(var command = taskLocals.Conn.CreateCommand())
      using(var reader = command.ExecuteReader(...))
      {
        if (reader.Read())
        {
           // No contention for `taskLocal`
           taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
        }
      }
      // The same type of our `taskLocal` param must be returned from the body
      return taskLocals;
    },
    // LocalFinally called once per Task after body completes
    // Also takes the taskLocal
    (taskLocals) =>
    {
       // Any cleanup work on our Task Locals (as you would do in a `finally` scope)
       if (taskLocals.Conn != null)
         taskLocals.Conn.Dispose();

       // Do any reduce / aggregate / synchronisation work.
       // NB : There is contention here!
       Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
    }
Run Code Online (Sandbox Code Playgroud)

还有更多例子:

每任务无竞争词典的示例

每任务数据库连接的示例


Dea*_*bit 7

作为@Honza Brestan答案的延伸.Parallel foreach将工作分成任务的方式也很重要,它会将几个循环迭代分组到一个任务中,因此实际上localInit()每循环n次迭代都会调用一次,并且可以同时启动多个组.

的一个点localInit,并localFinally为确保并行foreach循环可以组合每个itteration结果到一个结果没有你需要在指定锁的语句body,要做到这一点,你必须为你要创建的值初始化(localInit)然后每个bodyitteration都可以处理本地值,然后提供一种方法localFinally,以线程安全的方式组合每个group()的值.

如果您不需要localInit来同步任务,则可以使用lambda方法正常引用周围上下文中的值而不会出现任何问题.见线程在C#(的Parallel.For和Parallel.ForEach)深入教程更使用localInit /最后,向下滚动到与本地值优化,约瑟夫阿尔巴哈利真的是我的一切事物线程转到源.