ant*_*o12 0 c# backgroundworker async-await .net-core
我的 api 中有一些缓慢的调用,因此为了不阻塞我的 UI,我按照本教程实现了一个后台工作服务。在我里面_backgroundWorkerQueue我有
_backgroundWorkerQueue.QueueBackgroundWorkItem(async token =>
{
await client.ExecuteAsync(request, CancellationToken.None);
await _projectRepository.Update(id, "Update", "unlock");
});
Run Code Online (Sandbox Code Playgroud)
第二行,await _projectRepository.Update向我抛出一个错误,表明上下文已被处理并且更新失败。我将服务设置为瞬态,并将上下文设置为瞬态,以便以这种方式进行测试,但我仍然遇到相同的错误。如果可能的话,在不使用 Hangfire 等其他库的情况下,如何避免和解决这个问题的任何想法和想法。
首先,您不需要为此创建一个新类。Channel类所做的事情远远不止这些BackgroundWorkerQueue。
至于具体问题,在后台服务文档 后台任务与托管服务在 ASP.NET Core 中的“在后台任务中使用范围服务”部分中进行了描述。问题在于托管服务本质上是单例,而不是瞬态的。它在应用程序启动时创建,并在应用程序停止时释放。服务本身注册为瞬态服务,但由于其所有者是应用程序主机本身,因此它充当单例。
文档中显示的解决方案是注入IServiceProvider到您的服务中,并根据需要使用它构建新的范围和新的服务
要在BackgroundService 中使用作用域服务,请创建一个作用域。默认情况下,不会为托管服务创建范围。
public class ConsumeScopedServiceHostedService : BackgroundService
{
private readonly ILogger<ConsumeScopedServiceHostedService> _logger;
public ConsumeScopedServiceHostedService(IServiceProvider services,
ILogger<ConsumeScopedServiceHostedService> logger)
{
Services = services;
_logger = logger;
}
public IServiceProvider Services { get; }
...
private async Task DoWork(CancellationToken stoppingToken)
{
_logger.LogInformation(
"Consume Scoped Service Hosted Service is working.");
using (var scope = Services.CreateScope())
{
var scopedProcessingService =
scope.ServiceProvider
.GetRequiredService<IScopedProcessingService>();
await scopedProcessingService.DoWork(stoppingToken);
}
}
Run Code Online (Sandbox Code Playgroud)
下一节排队后台任务将使用基于通道的Func<CancellationToken, ValueTask>队列来解决您所描述的场景
历史记录:文档过去使用 QueueBackgroundWorkItem。当示例代码更改时,某些文档保持不变。该QueueBackgroundWorkItemAsync方法可能应该命名为EnqueueAsync. 该示例确实抽象了底层队列实现
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public BackgroundTaskQueue(int capacity)
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
Run Code Online (Sandbox Code Playgroud)
该IBackgroundTaskQueue服务应该被注入到生产者和BackgroundService 类中。生产者将用于QueueBackgroundWorkItemAsync发布工作,服务将用于DequeueAsync接收任务:
public class QueuedHostedService : BackgroundService
{
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"Queued Hosted Service is running.{Environment.NewLine}" +
$"{Environment.NewLine}Tap W to add a work item to the " +
$"background queue.{Environment.NewLine}");
await BackgroundProcessing(stoppingToken);
}
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem =
await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
await base.StopAsync(stoppingToken);
}
}
Run Code Online (Sandbox Code Playgroud)
可以通过返回 aChannelReader<Func<CancellationToken, ValueTask>>或IAsyncEnumerable<>` 隐藏内部实现来简化示例代码IAsyncEnumerable<Func<CancellationToken, ValueTask>>. Using ,就像原始示例一样:
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
IAsyncEnumerable<Func<CancellationToken, ValueTask>> ReadAllAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
...
public IAsyncEnumerable<Func<CancellationToken, ValueTask>> ReadAllAsync(
CancellationToken cancellationToken)
{
return _queue.Reader.ReadAllAsync(cancellationToken);
}
}
Run Code Online (Sandbox Code Playgroud)
这允许await foreach在BackgroundProcessing方法中使用:
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
await foreach(var workItem in TaskQueue.ReadAllAsync(stoppingToken)
{
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
Run Code Online (Sandbox Code Playgroud)
结合我们的范围服务,并假设队列接受Func<IProductRepository,CancellationToken, ValueTask>:
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
await foreach(var workItem in TaskQueue.ReadAllAsync(stoppingToken)
{
try
{
using (var scope = Services.CreateScope())
{
var repo = scope.ServiceProvider
.GetRequiredService<IProjectRepository>();
await workItem(repo,stoppingToken);
}
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1157 次 |
| 最近记录: |