.Net核心队列后台任务

joh*_*y 5 4 c# events async-await .net-core

在发送HTTP响应后,Slender回答了我最初的问题,即发生什么事情并忘记了,但是我现在剩下的问题是如何正确地将后台任务排队

编辑

众所周知,异步void通常是不好的,除了事件处理程序外,我想执行一些后台逻辑而不必等待客户端。我最初的想法是使用“火与遗忘”

假设我有一个事件:

public event EventHandler LongRunningTask;
Run Code Online (Sandbox Code Playgroud)

然后有人订阅了一个即发即弃任务:

LongRunningTask += async(s, e) => { await LongNetworkOperation;};
Run Code Online (Sandbox Code Playgroud)

Web api方法是调用:

[HttpGet]
public async IActionResult GetTask()
{
    LongRunningTask?.Invoke(this, EventArgs.Empty);
    return Ok();
}
Run Code Online (Sandbox Code Playgroud)

但是,如果我这样做不能保证我的长期运行任务能够完成,那么我该如何处理正在运行的后台任务而不影响发出请求所花费的时间(例如,我不想等待任务先完成) )?

小智 18

只是想为@johnny5 答案添加一些额外的注释。现在您可以使用https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/而不是 ConcurrentQueue 与 Semaphore。代码将是这样的:

public class HostedService: BackgroundService
{
        private readonly ILogger _logger;
        private readonly ChannelReader<Stream> _channel;

        public HostedService(
            ILogger logger,
            ChannelReader<Stream> channel)
        {
            _logger = logger;
            _channel = channel;
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {
            await foreach (var item in _channel.ReadAllAsync(cancellationToken))
            {
                try
                {
                    // do your work with data
                }
                catch (Exception e)
                {
                    _logger.Error(e, "An unhandled exception occured");
                }
            }
        }
}

[ApiController]
[Route("api/data/upload")]
public class UploadController : ControllerBase
{
    private readonly ChannelWriter<Stream> _channel;

    public UploadController (
        ChannelWriter<Stream> channel)
    {
        _channel = channel;
    }

    public async Task<IActionResult> Upload([FromForm] FileInfo fileInfo)
    {
        var ms = new MemoryStream();
        await fileInfo.FormFile.CopyToAsync(ms);
        await _channel.WriteAsync(ms);
        return Ok();
    }
}
Run Code Online (Sandbox Code Playgroud)


joh*_*y 5 12

.Net Core 2.1具有IHostedService,它将在后台安全运行任务。我发现在一个实例文档对于QueuedHostedService我已经modied使用BackgroundService。

public class QueuedHostedService : BackgroundService
{

    private Task _backgroundTask;
    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected async override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (false == stoppingToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(stoppingToken);
            try
            {
                await )workItem(stoppingToken);
            }
            catch (Exception ex)
            {
                this._logger.LogError(ex, $"Error occurred executing {nameof(workItem)}.");
            }
        }
    }
}

public interface IBackgroundTaskQueue
{
    void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);

    Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private ConcurrentQueue<Func<CancellationToken, Task>> _workItems =
        new ConcurrentQueue<Func<CancellationToken, Task>>();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(
        Func<CancellationToken, Task> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);
        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);
        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,我们可以在后台安全地将任务排队,而不会影响响应请求所花费的时间。