Man*_*ano 1 c# multithreading producer-consumer task-parallel-library
我仅限于.NET 3.5,并且正在使用TPL。该方案是生产者-消费者,但是没有阻塞的问题。由于种种限制,不能在这种情况下使用PLINQ,而我们想要实现的是生产许多物品的最快方法(每次生产都是长期运行的,并且物品数量超过100,000),但是每个物品必须以FIFO顺序消费(这意味着,我要求生产的第一个商品必须首先被消费,即使它是在其他商品之后创建的)也必须尽快消费。
对于这个问题,我尝试使用任务列表,等待列表中的第一项完成(taskList.First()。IsCompleted()),然后在其上使用消耗函数,但是由于某些原因,我似乎用光了内存(可能由于等待启动的任务而导致任务列表中的项目过多?)还有更好的方法吗?(我正在努力实现最快的速度)
非常感谢!
编辑后确定-将结果添加到BlockingCollection中,而不是将结果添加到BlockingCollection中。它具有按顺序处理项目的功能,并且具有最大的并行性,可以防止过多的线程启动,并且您会耗尽所有内存。
https://dotnetfiddle.net/lUbSqB
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
public class Program
{
private static BlockingCollection<Task<int>> BlockingCollection {get;set;}
public static void Producer(int numTasks)
{
Random r = new Random(7);
for(int i = 0 ; i < numTasks ; i++)
{
int closured = i;
Task<int> task = new Task<int>(()=>
{
Thread.Sleep(r.Next(100));
Console.WriteLine("Produced: " + closured);
return closured;
});
BlockingCollection.Add(task);
task.Start();
}
BlockingCollection.CompleteAdding();
}
public static void Main()
{
int numTasks = 20;
int maxParallelism = 3;
BlockingCollection = new BlockingCollection<Task<int>>(maxParallelism);
Task.Factory.StartNew(()=> Producer(numTasks));
foreach(var task in BlockingCollection.GetConsumingEnumerable())
{
task.Wait();
Console.WriteLine(" Consumed: "+ task.Result);
task.Dispose();
}
}
}
Run Code Online (Sandbox Code Playgroud)
结果:
Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
Produced: 3
Produced: 2
Consumed: 2
Consumed: 3
Produced: 4
Consumed: 4
Produced: 6
Produced: 5
Consumed: 5
Consumed: 6
Produced: 7
Consumed: 7
Produced: 8
Consumed: 8
Produced: 10
Produced: 9
Consumed: 9
Consumed: 10
Produced: 12
Produced: 13
Produced: 11
Consumed: 11
Consumed: 12
Consumed: 13
Produced: 15
Produced: 14
Consumed: 14
Consumed: 15
Produced: 17
Produced: 16
Produced: 18
Consumed: 16
Consumed: 17
Consumed: 18
Produced: 19
Consumed: 19
Run Code Online (Sandbox Code Playgroud)