Zap*_*ica 7 c# multithreading task-parallel-library async-await
我想比较两种理论情景.为了这个问题,我简化了案例.但基本上它是典型的生产者消费者情景.(我专注于消费者).
我有一个很大的Queue<string> dataQueue,我必须传输到多个客户端.
所以让我们从更简单的情况开始:
class SequentialBlockingCase
{
public static Queue<string> DataQueue = new Queue<string>();
private static List<string> _destinations = new List<string>();
/// <summary>
/// Is the main function that is run in its own thread
/// </summary>
private static void Run()
{
while (true)
{
if (DataQueue.Count > 0)
{
string data = DataQueue.Dequeue();
foreach (var destination in _destinations)
{
SendDataToDestination(destination, data);
}
}
else
{
Thread.Sleep(1);
}
}
}
private static void SendDataToDestination(string destination, string data)
{
//TODO: Send data using http post, instead simulate the send
Thread.Sleep(200);
}
}
}
Run Code Online (Sandbox Code Playgroud)
现在这个设置工作正常.它坐在那里并进行轮询Queue,当有数据要发送时,它会将其发送到所有目的地.
问题:
所以这是我的第二次尝试:
class ParalleBlockingCase
{
public static Queue<string> DataQueue = new Queue<string>();
private static List<string> _destinations = new List<string>();
/// <summary>
/// Is the main function that is run in its own thread
/// </summary>
private static void Run()
{
while (true)
{
if (DataQueue.Count > 0)
{
string data = DataQueue.Dequeue();
Parallel.ForEach(_destinations, destination =>
{
SendDataToDestination(destination, data);
});
}
else
{
Thread.Sleep(1);
}
}
}
private static void SendDataToDestination(string destination, string data)
{
//TODO: Send data using http post
Thread.Sleep(200);
}
}
Run Code Online (Sandbox Code Playgroud)
如果1个目的地缓慢或不可用,则此修订至少不会影响其他目的地.
但是这个方法仍然是阻塞的,我不确定是否Parallel.ForEach使用了线程池.我的理解是它将创建X个线程/任务并一次执行4个(4个核心cpu).但它必须在任务5开始之前完全完成芬兰任务1.
因此我的第三个选择:
class ParalleAsyncCase
{
public static Queue<string> DataQueue = new Queue<string>();
private static List<string> _destinations = new List<string> { };
/// <summary>
/// Is the main function that is run in its own thread
/// </summary>
private static void Run()
{
while (true)
{
if (DataQueue.Count > 0)
{
string data = DataQueue.Dequeue();
List<Task> tasks = new List<Task>();
foreach (var destination in _destinations)
{
var task = SendDataToDestination(destination, data);
task.Start();
tasks.Add(task);
}
//Wait for all tasks to complete
Task.WaitAll(tasks.ToArray());
}
else
{
Thread.Sleep(1);
}
}
}
private static async Task SendDataToDestination(string destination, string data)
{
//TODO: Send data using http post
await Task.Delay(200);
}
}
Run Code Online (Sandbox Code Playgroud)
现在从我理解这个选项,仍然会阻止主线程处于Task.WaitAll(tasks.ToArray());正常状态,因为我不希望它以比执行更快的速度创建任务.
但是并行执行的任务应该使用ThreadPool,并且所有X个任务应该立即开始执行,而不是阻塞或按顺序执行.(线程池将在它们变为活动状态时交换它们awaiting)
现在我的问题.
选项3是否比选项2具有任何性能优势.
特别是在更高性能的服务器端方案中.在我正在处理的特定软件中.上面我的简单用例会有多个实例.即几个消费者.
我对两种解决方案的理论差异和专业与缺点感兴趣,如果有的话,甚至可能是更好的第四种选择.
Ste*_*ary 10
Parallel.ForEach将使用线程池.异步代码不会,因为它根本不需要任何线程(链接到我的博客).
正如Mrinal所指出的,如果你有CPU绑定代码,并行性是合适的; 如果您有I/O绑定代码,则异步是合适的.在这种情况下,HTTP POST显然是I/O,因此理想的消费代码将是异步的.
如果有的话,甚至可能是更好的第四选择.
我建议让你的消费者完全异步.为此,您需要使用与异步兼容的生产者/消费者队列.在TPL Dataflow库中有一个相当高级的one (BufferBlock<T>),在我的AsyncEx库中有一个相当简单的one (AsyncProducerConsumerQueue<T>).
使用其中任何一个,您都可以创建完全异步的使用者:
List<Task> tasks = new List<Task>();
foreach (var destination in _destinations)
{
var task = SendDataToDestination(destination, data);
tasks.Add(task);
}
await Task.WhenAll(tasks);
Run Code Online (Sandbox Code Playgroud)
或者,更简化:
var tasks = _destinations
.Select(destination => SendDataToDestination(destination, data));
await Task.WhenAll(tasks);
Run Code Online (Sandbox Code Playgroud)
您的主要问题 - Parallel.ForEach 与 Async Forloop
computing operations,总是在内存中处理Parallel API利用从线程池中调用的线程来做一些工作,这就是调用的目的。IO bound operations,总是Async-Await,因为没有调用任何线程,并且它使用硬件功能IO completion ports在后台进行处理。由于 Async-Await 是首选选项,因此让我指出您的实现中的一些事项:
Synchronous您没有等待主要操作Send data using http post,所以正确的代码是await Http Post Async不是await Task.DelayAsync实现,例如Http post Async,则无需显式启动Task,只有在您有自定义的情况下才会出现这种情况Async方法时才会出现这种情况Task.WaitAll仅适用于控制台应用程序,该应用程序没有同步上下文或UI线程,否则会导致死锁,您需要使用Task.WhenAll现在关于Parallel approach
Parallel API确实可以工作Thread pool,并且大多数情况下它能够重用线程,从而进行优化,但是如果任务长时间运行,它可能最终会创建多个线程,以限制您可以使用构造函数选项new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },因此将最大数量限制为系统中逻辑核心的数量另一个重要的一点是,为什么绑定调用Parallel API是一个坏主意IO,因为每个线程都是昂贵的资源UI,包括创建,Thread environment block + User memory + Kernel Memory并且在 IO 操作中它闲置不做任何事情,从任何角度来看这都不好
| 归档时间: |
|
| 查看次数: |
2355 次 |
| 最近记录: |