如何避免在.NET Core中的异步调用中释放上下文

ant*_*o12 0 c# backgroundworker async-await .net-core

我的 api 中有一些缓慢的调用,因此为了不阻塞我的 UI,我按照本教程实现了一个后台工作服务。在我里面_backgroundWorkerQueue我有

_backgroundWorkerQueue.QueueBackgroundWorkItem(async token =>
{
    await client.ExecuteAsync(request, CancellationToken.None);
    await _projectRepository.Update(id, "Update", "unlock");
});
Run Code Online (Sandbox Code Playgroud)

第二行,await _projectRepository.Update向我抛出一个错误,表明上下文已被处理并且更新失败。我将服务设置为瞬态,并将上下文设置为瞬态,以便以这种方式进行测试,但我仍然遇到相同的错误。如果可能的话,在不使用 Hangfire 等其他库的情况下,如何避免和解决这个问题的任何想法和想法。

Pan*_*vos 5

首先,您不需要为此创建一个新类。Channel类所做的事情远远不止这些BackgroundWorkerQueue

至于具体问题,在后台服务文档 后台任务与托管服务在 ASP.NET Core 中的“在后台任务中使用范围服务”部分中进行了描述。问题在于托管服务本质上是单例,而不是瞬态的。它在应用程序启动时创建,并在应用程序停止时释放。服务本身注册为瞬态服务,但由于其所有者是应用程序主机本身,因此它充当单例。

文档中显示的解决方案是注入IServiceProvider到您的服务中,并根据需要使用它构建新的范围和新的服务

要在BackgroundService 中使用作用域服务,请创建一个作用域。默认情况下,不会为托管服务创建范围。

public class ConsumeScopedServiceHostedService : BackgroundService
{
    private readonly ILogger<ConsumeScopedServiceHostedService> _logger;

    public ConsumeScopedServiceHostedService(IServiceProvider services, 
        ILogger<ConsumeScopedServiceHostedService> logger)
    {
        Services = services;
        _logger = logger;
    }

    public IServiceProvider Services { get; }
    ...

    private async Task DoWork(CancellationToken stoppingToken)
    {
        _logger.LogInformation(
            "Consume Scoped Service Hosted Service is working.");

        using (var scope = Services.CreateScope())
        {
            var scopedProcessingService = 
                scope.ServiceProvider
                    .GetRequiredService<IScopedProcessingService>();

            await scopedProcessingService.DoWork(stoppingToken);
        }
    }
Run Code Online (Sandbox Code Playgroud)

下一节排队后台任务将使用基于通道的Func<CancellationToken, ValueTask>队列来解决您所描述的场景

历史记录:文档过去使用 QueueBackgroundWorkItem。当示例代码更改时,某些文档保持不变。该QueueBackgroundWorkItemAsync方法可能应该命名为EnqueueAsync. 该示例确实抽象了底层队列实现

public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public BackgroundTaskQueue(int capacity)
    {
        // Capacity should be set based on the expected application load and
        // number of concurrent threads accessing the queue.            
        // BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
        // which completes only when space became available. This leads to backpressure,
        // in case too many publishers/calls start accumulating.
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}
Run Code Online (Sandbox Code Playgroud)

IBackgroundTaskQueue服务应该被注入到生产者和BackgroundService 类中。生产者将用于QueueBackgroundWorkItemAsync发布工作,服务将用于DequeueAsync接收任务:

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger<QueuedHostedService> _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        ILogger<QueuedHostedService> logger)
    {
        TaskQueue = taskQueue;
        _logger = logger;
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation(
            $"Queued Hosted Service is running.{Environment.NewLine}" +
            $"{Environment.NewLine}Tap W to add a work item to the " +
            $"background queue.{Environment.NewLine}");

        await BackgroundProcessing(stoppingToken);
    }

    private async Task BackgroundProcessing(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var workItem = 
                await TaskQueue.DequeueAsync(stoppingToken);

            try
            {
                await workItem(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                    "Error occurred executing {WorkItem}.", nameof(workItem));
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Queued Hosted Service is stopping.");

        await base.StopAsync(stoppingToken);
    }
}
Run Code Online (Sandbox Code Playgroud)

可以通过返回 aChannelReader<Func<CancellationToken, ValueTask>>或IAsyncEnumerable<>` 隐藏内部实现来简化示例代码IAsyncEnumerable<Func<CancellationToken, ValueTask>>. Using ,就像原始示例一样:

public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken);

    IAsyncEnumerable<Func<CancellationToken, ValueTask>> ReadAllAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{

...
    public IAsyncEnumerable<Func<CancellationToken, ValueTask>> ReadAllAsync(
        CancellationToken cancellationToken)
    {
        return _queue.Reader.ReadAllAsync(cancellationToken);
    }
}
Run Code Online (Sandbox Code Playgroud)

这允许await foreachBackgroundProcessing方法中使用:

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
    await foreach(var workItem in TaskQueue.ReadAllAsync(stoppingToken)
    {
        try
        {
            await workItem(stoppingToken);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, 
                "Error occurred executing {WorkItem}.", nameof(workItem));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

结合我们的范围服务,并假设队列接受Func<IProductRepository,CancellationToken, ValueTask>

private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
    await foreach(var workItem in TaskQueue.ReadAllAsync(stoppingToken)
    {
        try
        {
            using (var scope = Services.CreateScope())
            {
                var repo = scope.ServiceProvider
                                .GetRequiredService<IProjectRepository>();
                await workItem(repo,stoppingToken);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, 
                "Error occurred executing {WorkItem}.", nameof(workItem));
        }        
    }
}
Run Code Online (Sandbox Code Playgroud)