Ser*_*gey 5 c# multithreading windows-services threadpool async-await
我正在编写一个Windows服务,它将启动多个工作线程,这些线程将监听Amazon SQS队列并处理消息.将有大约20个线程监听10个队列.
线程必须始终运行,这就是为什么我倾向于实际使用实际线程作为工作循环而不是线程池线程.
这是一个顶级实现.Windows服务将启动多个工作线程,每个线程都会监听它的队列和处理消息.
protected override void OnStart(string[] args)
{
     for (int i = 0; i < _workers; i++)
   {
      new Thread(RunWorker).Start();
   }
}
这是工作的实施
public async void RunWorker()
{
  while(true)
  {
    // .. get message from amazon sqs sync.. about 20ms
    var message = sqsClient.ReceiveMessage();
    try
    {
       await PerformWebRequestAsync(message);
       await InsertIntoDbAsync(message);
    }
    catch(SomeExeception)
    {
       // ... log
       //continue to retry
       continue;
    }
    sqsClient.DeleteMessage();
  }
}
我知道我可以使用Task.Run执行相同的操作并在线程池线程上执行它而不是启动单个线程,但我没有看到原因,因为每个线程将始终运行.
你看到这个实现有什么问题吗?让线程始终以这种方式运行是多么可靠,我该怎么做才能确保每个线程始终在运行?
现有解决方案的一个问题是,RunWorker尽管是在新线程(即 )上,但您还是以一种即发即忘的方式调用您的new Thread(RunWorker).Start()解决方案。
RunWorker是一个方法,当执行点命中第一个(即)async时,它将返回给调用者。如果返回挂起的任务,则返回并且刚刚启动的新线程终止。awaitawait PerformWebRequestAsync(message)PerformWebRequestAsyncRunWorker
我认为你根本不需要一个新线程,只需使用AmazonSQSClient.ReceiveMessageAsync它await的结果即可。async void另一件事是,除非您真的不关心跟踪异步任务的状态,否则您不应该使用方法。async Task代替使用。
您的代码可能如下所示:
List<Task> _workers = new List<Task>();
CancellationTokenSource _cts = new CancellationTokenSource();
protected override void OnStart(string[] args)
{
  for (int i = 0; i < _MAX_WORKERS; i++)
  {
    _workers.Add(RunWorkerAsync(_cts.Token)); 
  }
}
public async Task RunWorkerAsync(CancellationToken token)
{
  while(true)
  {
    token.ThrowIfCancellationRequested();
    // .. get message from amazon sqs sync.. about 20ms
    var message = await sqsClient.ReceiveMessageAsync().ConfigureAwait(false);
    try
    {
       await PerformWebRequestAsync(message);
       await InsertIntoDbAsync(message);
    }
    catch(SomeExeception)
    {
       // ... log
       //continue to retry
       continue;
    }
    sqsClient.DeleteMessage();
  }
}
现在,要停止所有待处理的工作人员,您可以简单地执行此操作(从主“请求调度程序”线程):
_cts.Cancel();
try
{
    Task.WaitAll(_workers.ToArray()); 
}
catch (AggregateException ex) 
{
    ex.Handle(inner => inner is OperationCanceledException);
}
注意,ConfigureAwait(false)对于 Windows 服务来说是可选的,因为默认情况下初始线程上没有同步上下文。但是,我会保持这种方式,以使代码独立于执行环境(对于存在同步上下文的情况)。
最后,如果由于某种原因你不能使用,或者你需要调用另一个阻塞 API,或者只是在的开头ReceiveMessageAsync做一些 CPU 密集型工作,只需将其包装起来(而不是包装整个):RunWorkerAsyncTask.RunRunWorkerAsync
var message = await Task.Run(
    () => sqsClient.ReceiveMessage()).ConfigureAwait(false);
| 归档时间: | 
 | 
| 查看次数: | 2563 次 | 
| 最近记录: |