TPL Parallel.ForEach 中的每线程实例对象

Yel*_*yev 2 c# nhibernate multithreading task-parallel-library

是否有一种 TPL 语法允许您将对象从池中注入到任务中,以便一个对象一次只能被一个线程使用?或者甚至更好 - 仅由同一个线程使用?

使用示例

假设我想创建 10 个线程来打开 10 个文件:1.txt, 2.txt, 3.txt...10.txt并随机将 500 000 个后续数字写入这些文件。

我可以做这个:

ConcurrentQueue<int> objs = new ConcurrentQueue<int>(); // 500000 numbers go here
Task[] tasks = Enumerable.Range(1, 10)
    .Select(i =>
    {
        return Task.Factory.StartNew(() => 
        {
            using (var f = File.Open($"{i}.txt"))
            {
                using (var wr = StreamWriter(f))
                {
                    while (objs.TryDequeue(out int obj))
                    {
                        wr.WriteLine(obj);
                    }
                }
            }
        }
    })
    .ToArray();
Task.WaitAll(tasks);
Run Code Online (Sandbox Code Playgroud)

但是,是否可以仅使用 TPL 提供相同的行为而不使用并发集合?

Pan*_*vos 5

如果删除除最后两个编辑之外的所有内容会更好。

如果问题是Can you pass an object per task (not thread) when using Parallel.?答案是:是的,你可以通过任何重载接受本地状态,即有TLocal型像这一个

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource,?ParallelLoopState,?TLocal,?TLocal> body,
    Action<TLocal> localFinally
)
Run Code Online (Sandbox Code Playgroud)

Parallel.For不使用线程。它对数据进行分区并为每个分区创建一个任务。每个任务最终都会处理一个分区的所有数据。通常,Parallel使用与核心数量一样多的任务。它还使用当前线程进行处理,这就是它出现阻塞当前线程的原因。不是,它开始用于处理分区之一。

处理本地数据的函数允许您生成初始本地值并将其传递给每次body调用。本地数据的所有重载都需要body重新调整(可能已修改的)数据,因此Parallel它本身不必存储它。这是必不可少的,因为Parallel.可以终止和重新启动任务。如果它必须跟踪本地数据,它将无法轻松或高效地做到这一点。

对于这个特定的例子,绕过 ORM 不适合批量操作的事实,尤其是在处理数十万个对象时,localInit应该创建一个新会话。body应该使用并返回该会话,而最后localFinally应该处理它。

var mySessionFactory
var myData=....;
Parallel.ForEach(
    myData,
    ()=>CreateSession(),
    (record,state,session)=>{
        //process the data etc.
        return session;
    },
    (session)=>session.Dispose()
);
Run Code Online (Sandbox Code Playgroud)

不过还有一些警告。NH 将更改保留在内存中,直到它们被刷新并清除缓存。这会产生内存问题。一种解决方案是保持计数并定期刷新数据。状态可以是一个(int counter,Session session)元组,而不是会话:

Parallel.ForEach(
    myData,
    ()=>(counter:0,session:CreateSession()),
    (record,state,localData)=>{
        var (counter,session)=localData;
        //process the data etc.
        if (counter % 1000 ==0)
        {
            session.Flush();
            session.Clear();
        }
        return (++counter,session);
    },
    data=>data.session.Dispose()
);
Run Code Online (Sandbox Code Playgroud)

一个更好的解决办法是一批提前的对象,这样,而不是一个IEnumerable<MyRecord>循环将在工作IEnumerable<MyRecord[]>阵列。结合批处理语句,这将减少 ORM 对批量操作施加的性能损失。

编写Batch方法并不难,但MoreLinq已经提供了一个方法,可作为源代码或 NuGet 包使用:

var myBatches=myData.Batch(1000);
Parallel.ForEach(
    myBatches,
    ()=>CreateSession(),
    (records,state,session)=>{

        foreach(var record in records)
        {
            //process the data etc.
            session.Save(record);                
        }
        session.Flush();
        session.Clear();
        return session;
    },
    data=>data.session.Dispose()
);
Run Code Online (Sandbox Code Playgroud)