Parallel.For循环 - 为每个线程分配唯一的数据实体

Mri*_*boj 4 c# parallel-processing multithreading task-parallel-library parallel.foreach

我有100条并行化记录,从1到100,现在我可以方便地使用Parallel.For在Parallel中执行它们如下,这将基于计算资源工作

 Parallel.For(0, limit, i =>
    {
        DoWork(i);
    });
Run Code Online (Sandbox Code Playgroud)

但是有一些限制,每个线程需要使用相同的数据实体,并且数量有限的数据实体说10,它们是通过相互克隆并将它们保存在像Dictionary或List这样的结构中而先进创建的.现在我可以使用以下代码限制并行化的数量:

 Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        DoWork(i);
    });
Run Code Online (Sandbox Code Playgroud)

但问题是如何为每个传入线程分配一个唯一的数据实体,这样任何其他当前线程都不会使用Data实体,因为线程和数据实体的数量是相同的,所以饥饿不是问题.我可以想到,我为每个数据实体创建一个布尔值,指定它是否正在使用,因此我们遍历字典或列表以查找下一个可用数据实体并锁定整个分配过程,以便一个线程在给定时间被分配了一个数据实体,但在我看来这个问题将有更优雅的解决方案,我的版本只是一个解决方法,而不是真正的修复.我的逻辑是:

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
        {
            lock(All_Threads_Common_Object)
            {
              Check for available data entity using boolean
              Assign the Data entity
            }
            DoWork(i);

            Reset the Boolean value for another thread to use it
        });
Run Code Online (Sandbox Code Playgroud)

如果问题需要进一步澄清,请告诉我

Rot*_*tem 6

使用其重载Parallel.For接受线程本地初始化函数.

Parallel.For<DataEntity>(0, limit, 
    //will run once for each thread
    () => GetThreadLocalDataEntity(),

    //main loop body, will run once per iteration
    (i, loop, threadDataEntity) =>
    {
        DoWork(i, threadDataEntity);
        return threadDataEntity; //we must return it here to adhere to the Func signature.
    },

    //will run once for each thread after the loop
    (threadDataEntity) => threadDataEntity.Dispose() //if necessary
);
Run Code Online (Sandbox Code Playgroud)

这个方法与你在问题中发布的方法的主要优点是,DataEntity每个线程分配一次,而不是每次循环迭代一次.

  • @MrinalKamboj难道你还不能回来吗?没有效果. (2认同)

Das*_*ter 5

您可以使用并发集合来存储10个对象.每个工作人员将拉出一个数据实体,使用它并将其还原.并发收集的使用很重要,因为在您的方案中,正常的收集不是线程安全的.

像这样:

var queue = new ConcurrentQueue<DataEntity>();
// fill the queue with 10 items

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        DataEntity x;
        if(!queue.TryDequeue(out x))
            throw new InvalidOperationException();
        DoWork(i, x);
        queue.Enqueue(x);
    });
Run Code Online (Sandbox Code Playgroud)

或者,如果需要提供阻塞,请将事物包装在BlockingCollection中.

编辑:你把它包在一个循环中继续等待.相反,使用像这样的BlockingCollection:

var entities = new BlockingCollection(new ConcurrentQueue<DataEntity>());

// fill the collection with 10 items

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        DataEntity x = entities.Take();
        DoWork(i, x);
        entities.Add(x);
    });
Run Code Online (Sandbox Code Playgroud)

  • 好吧,你的例子暗示数据实体不是"被处理掉",而是"可重用".因此,您有10个数据实体和10个工作人员.在工作完成之前,实体会被放回.你还说"由于线程和数据实体的数量是相同的**饥饿不是问题**".所以,如果我理解你的问题,那么异常就不应该抛出. (2认同)