Mat*_*gan 28 .net c# asynchronous
我有一个1000输入消息的集合来处理.我正在循环输入集合并为每个消息启动新任务以进行处理.
//Assume this messages collection contains 1000 items
var messages = new List<string>();
foreach (var msg in messages)
{
Task.Factory.StartNew(() =>
{
Process(msg);
});
}
Run Code Online (Sandbox Code Playgroud)
我们可以猜测当时同时处理多少个最大消息(假设是普通的四核处理器),还是我们可以限制当时要处理的最大消息数?
如何确保以与Collection相同的顺序/顺序处理此消息?
Har*_*sad 39
你可以使用Parallel.Foreach和依赖MaxDegreeOfParallelism.
Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
msg =>
{
// logic
Process(msg);
});
Run Code Online (Sandbox Code Playgroud)
Cle*_*gic 25
SemaphoreSlim在这种情况下是一个非常好的解决方案,我强烈建议OP尝试这个,但@ Manoj的答案有注释中提到的缺陷.在产生这样的任务之前应该等待.
更新后的答案:由于@Vasyl指出信号量可能在任务完成之前被处理,并且在Release()调用方法时会引发异常,因此在退出使用块之前必须等待所有创建的任务完成.
int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach(var msg in messages)
{
concurrencySemaphore.Wait();
var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
}
Run Code Online (Sandbox Code Playgroud)
如果没有Task.WaitAll
在控制台应用程序中运行以下代码,那么想要查看信号量如何处理的人可以回答评论,并且会引发此异常.
System.ObjectDisposedException:'信号量已被释放.'
static void Main(string[] args)
{
int maxConcurrency = 5;
List<string> messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();
using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach (var msg in messages)
{
concurrencySemaphore.Wait();
var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});
tasks.Add(t);
}
// Task.WaitAll(tasks.ToArray());
}
Console.WriteLine("Exited using block");
Console.ReadKey();
}
private static void Process(string msg)
{
Thread.Sleep(2000);
Console.WriteLine(msg);
}
Run Code Online (Sandbox Code Playgroud)
随着.NET 5.0和Core 3.0 通道的引入。
这种生产者/消费者并发模式的主要好处是您还可以限制输入数据处理以减少资源影响。
这在处理数百万条数据记录时特别有用。
您现在可以连续查询数据块,并等待工作人员处理它,然后再查询更多数据,而不是立即将整个数据集读取到内存中。
队列容量为 50 条消息且最大逻辑处理器作为消费者线程的代码示例:
/// <exception cref="System.AggregateException">Thrown on Consumer Task exceptions.</exception>
public static async Task ProcessMessages(
List<string> messages,
int producerCapacity = 50,
int consumerTaskLimit = 0)
{
if (consumerTaskLimit == 0)
consumerTaskLimit = Environment.ProcessorCount;
// by default only uses one processor group
// https://stackoverflow.com/questions/27965962/c-sharp-environment-processorcount-does-not-always-return-the-full-number-of-log
var tokenSource = new CancellationTokenSource();
CancellationToken ct = tokenSource.Token;
var channel = Channel.CreateBounded<string>(producerCapacity);
_ = Task.Run(async () =>
{
try
{
foreach (var msg in messages)
{
await channel.Writer.WriteAsync(msg, ct);
ct.ThrowIfCancellationRequested();
// blocking when channel is full
// waiting for the consumer tasks to pop messages from the queue
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
Console.WriteLine("Exception while processing Messages\n" + ex);
tokenSource.Cancel();
}
finally
{
channel.Writer.Complete();
// signaling the end of queue so that
// WaitToReadAsync will return false to stop the consumer tasks
}
});
var consumerTasks = Enumerable
.Range(1, consumerTaskLimit)
.Select(_ => Task.Run(async () =>
{
try
{
while (await channel.Reader.WaitToReadAsync(ct))
{
ct.ThrowIfCancellationRequested();
while (channel.Reader.TryRead(out var message))
{
await Task.Delay(500);
Console.WriteLine(message);
}
}
}
catch (OperationCanceledException) { }
catch
{
tokenSource.Cancel();
throw;
}
}))
.ToArray();
Task waitForConsumers = Task.WhenAll(consumerTasks);
try { await waitForConsumers; }
catch
{
if (waitForConsumers.IsFaulted && waitForConsumers.Exception is not null)
{
foreach (var e in waitForConsumers.Exception.Flatten().InnerExceptions)
Console.WriteLine(e.ToString());
throw waitForConsumers.Exception.Flatten();
}
else throw;
}
}
Run Code Online (Sandbox Code Playgroud)
正如Theodor Zoulias所指出的:在多个消费者异常时,剩余的任务将继续运行,并且必须承担被杀死任务的负载。为了避免这种情况,我实现了 CancellationToken 来停止所有剩余任务并处理waitForConsumers.Exception的AggregateException中组合的异常。
旁注:
任务并行库 (TPL)可能擅长根据本地资源自动限制任务。但是,当您通过 RPC 远程处理数据时,有必要手动限制 RPC 调用,以避免填满网络/处理堆栈!
小智 7
我认为使用Parallel LINQ会更好
Parallel.ForEach(messages ,
new ParallelOptions{MaxDegreeOfParallelism = 4},
x => Process(x);
);
Run Code Online (Sandbox Code Playgroud)
其中x是MaxDegreeOfParallelism
| 归档时间: |
|
| 查看次数: |
28018 次 |
| 最近记录: |