为什么在Parallel.ForEach中每个线程多次调用localInit Func

Mik*_*ike 7 .net c# task-parallel-library .net-4.5

我正在编写一些代码来处理大量数据,我认为让Parallel.ForEach为它创建的每个线程创建一个文件是有用的,因此输出不需要同步(至少由我来).

它看起来像这样:

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();

        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));

        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());
Run Code Online (Sandbox Code Playgroud)

我期望发生的是,最多可以创建8个文件并在整个运行时间内持续存在.然后在整个ForEach调用结束时,每个都将被处理.真正发生的是localInit似乎每个项目都被调用一次,所以我最终得到了数百个文件.作者也被处理在每个处理项目的末尾.

这表明发生了同样的事情:

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));
Run Code Online (Sandbox Code Playgroud)

我知道了:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18
Run Code Online (Sandbox Code Playgroud)

注意:如果我省略Thread.Sleep调用,它有时似乎"正确"运行.对于它决定在我的电脑上使用的4个线程,localInit只被调用一次.然而,并非每次都如此.

这是函数的期望行为吗?幕后发生了什么导致它这样做?最后,什么是获得我想要的功能的好方法,ThreadLocal?

顺便说一句,这是在.NET 4.5上.

Chr*_*ens 7

Parallel.ForEach不像你想象的那样工作.需要注意的是该方法是建立在顶部是非常重要的Task类和之间的关系TaskThread不是1:1.例如,您可以拥有在2个托管线程上运行的10个任务.

尝试在方法体中使用此行而不是当前行:

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                  Thread.CurrentThread.ManagedThreadId, Task.CurrentId);
Run Code Online (Sandbox Code Playgroud)

您应该看到ThreadId它将在许多不同的任务中重用,由它们的唯一ID显示.如果你离开或增加了你的电话,你会看到更多Thread.Sleep.

关于该Parallel.ForEach方法如何工作的(非常)基本思想是,它使您的枚举创建了一系列将运行枚举的过程部分的任务,这样做的方式很大程度上取决于输入.还有一些特殊的逻辑可以检查任务超过一定毫秒数而不完成的情况.如果这种情况属实,则可能会产生新任务以帮助减轻工作量.

如果您查看localinit函数的文档Parallel.ForEach,您会注意到它表示它returns the initial state of the local data for each _task_,而不是每个线程.

您可能会问为什么生成的任务超过8个.答案类似于上一篇,在文档中找到ParallelOptions.MaxDegreeOfParallelism.

MaxDegreeOfParallelism从默认值更改仅限制将使用多少并发任务.

此限制仅适用于并发任务的数量,而不是在整个处理期间将创建的任务数量的硬限制.正如我上面提到的,有时会产生一个单独的任务,这会导致您的localinit函数被多次调用并将数百个文件写入磁盘.

写入磁盘肯定是一种具有一点延迟的操作,尤其是在使用同步I/O时.当磁盘操作发生时,它会阻塞整个线程; 同样的事情发生在Thread.Sleep.如果a Task执行此操作,它将阻止当前正在运行的线程,并且不能在其上运行任何其他任务.通常在这些情况下,调度程序将生成一个新的Task以帮助获得松弛.

最后,什么是获得我想要的功能的好方法,ThreadLocal?

底线是线程本地化没有意义,Parallel.ForEach因为你没有处理线程; 你正在处理任务.可以在任务之间共享本地线程,因为许多任务可以同时使用同一个线程.此外,任务的线程本地可以改变执行中期,因为调度程序可以抢占它,然后继续执行不同的线程,这将在本地具有不同的线程.

我不确定最好的方法,但你可以依靠localinit函数传递你想要的任何资源,只允许一次在一个线程中使用资源.您可以使用localfinally它将其标记为不再使用,因此可用于获取其他任务.这就是那些方法的设计目的; 每个方法仅在每个生成的任务中调用一次(请参阅Parallel.ForEachMSDN文档的备注部分).

您也可以自己拆分工作,并创建自己的一组线程并运行您的工作.然而,在我看来,这不是一个想法,因为Parallel班级已经为你做了这个繁重的工作.