Lio*_*ioH 5 c# asp.net parallel-processing asp.net-core asp.net-core-hosted-services
我有一个 ASP .NET core Web API,它使用此处描述的排队后台任务 。
我已使用提供的代码示例并添加了IBackgroundTaskQueue
,BackgroundTaskQueue
和 ,QueuedHostedService
完全按照文章中所述。
在 my 中Startup.cs
,我仅注册一个QueuedHostedService
实例,如下所示:services.AddHostedService<QueuedHostedService>();
来自 WebApi 控制器的任务将被入队,然后出队并由QueuedHostedService
.
我希望允许多个后台处理线程使传入的任务出队并执行。我能想到的最直接的解决方案是QueuedHostedService
在我的Startup.cs
. 即,像这样:
int maxNumOfParallelOperations;
var isValid = int.TryParse(Configuration["App:MaxNumOfParallelOperations"], out maxNumOfParallelOperations);
maxNumOfParallelOperations = isValid && maxNumOfParallelOperations > 0 ? maxNumOfParallelOperations : 2;
for (int index = 0; index < maxNumOfParallelOperations; index++)
{
services.AddHostedService<QueuedHostedService>();
}
Run Code Online (Sandbox Code Playgroud)
我还注意到,由于 中的信号量BackgroundTaskQueue
,QueuedHostedService
实例并不是一直在工作,而是仅在队列中有新任务可用时才唤醒。
这个解决方案在我的测试中似乎效果很好。
但是,在这个特定的用例中 - 它真的是一个有效的、推荐的并行处理解决方案吗?
您可以使用IHostedService
具有多个线程的 来使用IBackgroundTaskQueue
.
这是一个基本的实现。我假设您正在使用相同的内容IBackgroundTaskQueue
并在此处BackgroundTaskQueue
进行描述。
public class QueuedHostedService : IHostedService
{
private readonly ILogger _logger;
private readonly Task[] _executors;
private readonly int _executorsCount = 2; //--default value: 2
private CancellationTokenSource _tokenSource;
public IBackgroundTaskQueue TaskQueue { get; }
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILoggerFactory loggerFactory,
IConfiguration configuration)
{
TaskQueue = taskQueue;
_logger = loggerFactory.CreateLogger<QueuedHostedService>();
if (ushort.TryParse(configuration["App:MaxNumOfParallelOperations"], out var ct))
{
_executorsCount = ct;
}
_executors = new Task[_executorsCount];
}
public Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is starting.");
_tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
for (var i = 0; i < _executorsCount; i++)
{
var executorTask = new Task(
async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
#if DEBUG
_logger.LogInformation("Waiting background task...");
#endif
var workItem = await TaskQueue.DequeueAsync(cancellationToken);
try
{
#if DEBUG
_logger.LogInformation("Got background task, executing...");
#endif
await workItem(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem)
);
}
}
}, _tokenSource.Token);
_executors[i] = executorTask;
executorTask.Start();
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
_tokenSource.Cancel(); // send the cancellation signal
if (_executors != null)
{
// wait for _executors completion
Task.WaitAll(_executors, cancellationToken);
}
return Task.CompletedTask;
}
}
Run Code Online (Sandbox Code Playgroud)
ConfigureServices
您需要在课堂上注册服务Startup
。
...
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddHostedService<QueuedHostedService>();
...
Run Code Online (Sandbox Code Playgroud)
另外,您可以在配置中设置线程数(appsettings.json
)
...
"App": {
"MaxNumOfParallelOperations": 4
}
...
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6670 次 |
最近记录: |