Kis*_*jar 9 c# multithreading task-parallel-library c#-4.0 parallel.foreach
我有一个应用程序,我有1个大文件的1000多个小部分.
我必须一次最多上传16个零件.
我使用.Net的Thread并行库.
我使用Parallel.For来划分多个部分并分配1个应该为每个部分执行的方法,并将DegreeOfParallelism设置为16.
我需要使用由不同部件上传生成的校验和值来执行1方法,因此我必须设置某些机制,我必须等待所有部件上传说1000完成.在TPL库中,我面临的问题是它是从1000中随机执行16个线程中的任何一个.
我想要一些机制,我可以在最初运行前16个线程,如果第一个或第二个或任何16个线程完成其任务,则应该启动第17个部分.
我怎样才能实现这一目标?
一个可能的候选者可以是TPL Dataflow.这是一个演示,它接收整数流并将它们打印到控制台.您可以设置为MaxDegreeOfParallelism
希望并行旋转的多个线程:
void Main()
{
var actionBlock = new ActionBlock<int>(
i => Console.WriteLine(i),
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16});
foreach (var i in Enumerable.Range(0, 200))
{
actionBlock.Post(i);
}
}
Run Code Online (Sandbox Code Playgroud)
如果您想拥有多个生产者/消费者,这也可以很好地扩展.
这是执行此操作的手动方法。
你需要一个队列。队列是待处理任务的序列。您必须将它们出队并将其放入工作任务列表中。任务完成后,将其从工作任务列表中删除,并从队列中取出另一个任务。主线程控制这个过程。以下是如何执行此操作的示例。
对于测试,我使用了整数列表,但它应该适用于其他类型,因为它使用泛型。
private static void Main()
{
Random r = new Random();
var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();
ParallelQueue(items, DoWork);
}
private static void ParallelQueue<T>(List<T> items, Action<T> action)
{
Queue pending = new Queue(items);
List<Task> working = new List<Task>();
while (pending.Count + working.Count != 0)
{
if (pending.Count != 0 && working.Count < 16) // Maximum tasks
{
var item = pending.Dequeue(); // get item from queue
working.Add(Task.Run(() => action((T)item))); // run task
}
else
{
Task.WaitAny(working.ToArray());
working.RemoveAll(x => x.IsCompleted); // remove finished tasks
}
}
}
private static void DoWork(int i) // do your work here.
{
// this is just an example
Task.Delay(i).Wait();
Console.WriteLine(i);
}
Run Code Online (Sandbox Code Playgroud)
如果您遇到如何自行实施 DoWork 的问题,请告诉我。因为如果您更改方法签名,您可能需要进行一些更改。
更新
您还可以使用 async wait 来执行此操作,而不会阻塞主线程。
private static void Main()
{
Random r = new Random();
var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();
Task t = ParallelQueue(items, DoWork);
// able to do other things.
t.Wait();
}
private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func)
{
Queue pending = new Queue(items);
List<Task> working = new List<Task>();
while (pending.Count + working.Count != 0)
{
if (working.Count < 16 && pending.Count != 0)
{
var item = pending.Dequeue();
working.Add(Task.Run(async () => await func((T)item)));
}
else
{
await Task.WhenAny(working);
working.RemoveAll(x => x.IsCompleted);
}
}
}
private static async Task DoWork(int i)
{
await Task.Delay(i);
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
3326 次 |
最近记录: |