从asp.net core 2中的控制器操作运行后台任务

Wax*_*ren 22 .net c# asp.net-core-mvc asp.net-core asp.net-core-2.0

我正在使用带有asp.net core 2.0的C#开发一个带有REST Api的Web应用程序

我想要实现的是当客户端向端点发送请求时,我将运行与客户端请求上下文分离的后台任务,如果任务成功启动,该任务将结束.

我知道有HostedService,但问题是HostedService在服务器启动时启动,据我所知,没有办法从控制器手动启动HostedService.

这是一个简单的代码来演示这个问题.

[Authorize(AuthenticationSchemes = "UsersScheme")]
public class UsersController : Controller
{

    [HttpPost]
    public async Task<JsonResult> StartJob([FromForm] string UserId, [FromServices] IBackgroundJobService backgroundService) {

           //check user account
           (bool isStarted, string data) result = backgroundService.Start();

           return JsonResult(result);
    }
}
Run Code Online (Sandbox Code Playgroud)

Fab*_*bio 27

您仍然可以将IHostedService后台任务的基础作为基础BlockingCollection.

创建包装器,BlockingCollection以便将其作为单例注入.

public class TasksToRun
{
    private readonly BlockingCollection<TaskSettings> _tasks;

    public TasksToRun() => _tasks = new BlockingCollection<TaskSettings>();

    public void Enqueue(TaskSettings settings) => _tasks.Add(settings);

    public TaskSettings Dequeue(CancellationToken token) => _tasks.Take(token);
}
Run Code Online (Sandbox Code Playgroud)

然后在执行IHostedService"监听"任务以及任务"到达"时执行它.
BlockingCollection如果集合为空,将停止执行 - 因此您的while循环不会消耗处理器时间.
.Take方法接受cancellationToken为参数.使用令牌,您可以在应用程序停止时取消"等待"下一个任务.

public class BackgroundService : IHostedService
{
    private readonly TasksToRun _tasks;

    private CancellationTokenSource _tokenSource;

    private Task _currentTask;

    public BackgroundService(TasksToRun tasks) => _tasks = tasks;

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        while (cancellationToken.IsCancellationRequested == false)
        {
            try
            {
                var taskToRun = _tasks.Dequeue(_tokenSource.Token);

                // We need to save executable task, 
                // so we can gratefully wait for it's completion in Stop method
                _currentTask = ExecuteTask(taskToRun);               
                await _currentTask;
            }
            catch (OperationCanceledException)
            {
                // execution cancelled
            }
        }
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        _tokenSource.Cancel(); // cancel "waiting" for task in blocking collection

        if (_currentTask == null) return;

        // wait when _currentTask is complete
        await Task.WhenAny(_currentTask, Task.Delay(-1, cancellationToken));
    }
}
Run Code Online (Sandbox Code Playgroud)

在控制器中,您只需将要运行的任务添加到我们的集合中

public class JobController : Controller
{
    private readonly TasksToRun _tasks;

    public JobController(TasksToRun tasks) => _tasks = tasks;

    public IActionResult PostJob()
    {
        var settings = CreateTaskSettings();

        _tasks.Enqueue(settings);

        return Ok();
    }
}
Run Code Online (Sandbox Code Playgroud)

用于阻塞收集的包装器应该作为单例注册依赖注入

services.AddSingleton<TasksToRun, TasksToRun>();
Run Code Online (Sandbox Code Playgroud)

注册后台服务

services.AddHostedService<BackgroundService>();
Run Code Online (Sandbox Code Playgroud)

  • 这真的有效吗?我尝试实现并很快注意到托管服务的“StartAsync”方法中的“BlockingCollection”的“Take”方法阻止了服务启动。这还会阻止 ASP.NET 运行时启动,因为该服务从未完成注册。 (7认同)
  • @Fabio你的答案并不表明它是伪代码。此外,它看起来像是编写的完全有效的运行时代码,因此显然具有误导性。我建议您编辑答案以澄清。 (3认同)
  • @Justin,不,它不起作用。我们需要在 StartAsync 中返回 Task.CompletedTask 才能启动 Web 应用程序。实际工作可以包含在“任务”中。但是,人们可以只使用没有“IHostedService”的任务。我真的很想知道为什么这个答案获得了这么多票。 (2认同)

Jan*_*ann 7

这受到了skjagini's answer 中链接的文档的极大启发,并进行了一些改进。

我认为在此处重申整个示例可能会有所帮助,以防链接在某些时候中断。我做了一些调整;最值得注意的是,我注入了一个IServiceScopeFactory, 以允许后台进程自己安全地请求服务。我在这个答案的末尾解释了我的推理。


核心思想是创建一个任务队列,用户可以将其注入到他们的控制器中,然后将任务分配给它。长期运行的托管服务中存在相同的任务队列,该服务一次出列一个任务并执行它。

任务队列:

public interface IBackgroundTaskQueue
{
    // Enqueues the given task.
    void EnqueueTask(Func<IServiceScopeFactory, CancellationToken, Task> task);

    // Dequeues and returns one task. This method blocks until a task becomes available.
    Task<Func<IServiceScopeFactory, CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly ConcurrentQueue<Func<IServiceScopeFactory, CancellationToken, Task>> _items = new();

    // Holds the current count of tasks in the queue.
    private readonly SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void EnqueueTask(Func<IServiceScopeFactory, CancellationToken, Task> task)
    {
        if(task == null)
            throw new ArgumentNullException(nameof(task));

        _items.Enqueue(task);
        _signal.Release();
    }

    public async Task<Func<IServiceScopeFactory, CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken)
    {
        // Wait for task to become available
        await _signal.WaitAsync(cancellationToken);

        _items.TryDequeue(out var task);
        return task;
    }
}
Run Code Online (Sandbox Code Playgroud)

在任务队列的核心,我们有一个线程安全的ConcurrentQueue<>. 由于我们不想在新任务可用之前轮询队列,因此我们使用一个SemaphoreSlim对象来跟踪队列中当前的任务数。每次我们调用 时Release,内部计数器都会递增。该WaitAsync方法会阻塞,直到内部计数器大于 0,然后将其递减。

为了出队和执行任务,我们创建了一个后台服务:

public class BackgroundQueueHostedService : BackgroundService
{
    private readonly IBackgroundTaskQueue _taskQueue;
    private readonly IServiceScopeFactory _serviceScopeFactory;
    private readonly ILogger<BackgroundQueueHostedService> _logger;

    public BackgroundQueueHostedService(IBackgroundTaskQueue taskQueue, IServiceScopeFactory serviceScopeFactory, ILogger<BackgroundQueueHostedService> logger)
    {
        _taskQueue = taskQueue ?? throw new ArgumentNullException(nameof(taskQueue));
        _serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Dequeue and execute tasks until the application is stopped
        while(!stoppingToken.IsCancellationRequested)
        {
            // Get next task
            // This blocks until a task becomes available
            var task = await _taskQueue.DequeueAsync(stoppingToken);

            try
            {
                // Run task
                await task(_serviceScopeFactory, stoppingToken);
            }
            catch(Exception ex)
            {
                _logger.LogError(ex, "An error occured during execution of a background task");
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

最后,我们需要让我们的任务队列可用于依赖注入,并启动我们的后台服务:

public void ConfigureServices(IServiceCollection services)
{
    // ...
    
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
    services.AddHostedService<BackgroundQueueHostedService>();
    
    // ...
}
Run Code Online (Sandbox Code Playgroud)

我们现在可以将后台任务队列注入我们的控制器并使任务入队:

public class ExampleController : Controller
{
    private readonly IBackgroundTaskQueue _backgroundTaskQueue;

    public ExampleController(IBackgroundTaskQueue backgroundTaskQueue)
    {
        _backgroundTaskQueue = backgroundTaskQueue ?? throw new ArgumentNullException(nameof(backgroundTaskQueue));
    }

    public IActionResult Index()
    {
        _backgroundTaskQueue.EnqueueTask(async (serviceScopeFactory, cancellationToken) =>
        {
            // Get services
            using var scope = serviceScopeFactory.CreateScope();
            var myService = scope.ServiceProvider.GetRequiredService<IMyService>();
            var logger = scope.ServiceProvider.GetRequiredService<ILogger<ExampleController>>();
            
            try
            {
                // Do something expensive
                await myService.DoSomethingAsync(cancellationToken);
            }
            catch(Exception ex)
            {
                logger.LogError(ex, "Could not do something expensive");
            }
        });

        return Ok();
    }
}
Run Code Online (Sandbox Code Playgroud)

为什么使用IServiceScopeFactory?

理论上,我们可以直接使用我们注入控制器的服务对象。这可能适用于单例服务,也适用于大多数范围服务。

但是,对于实现IDisposable(例如,DbContext)的范围服务,这可能会中断:在将任务排队后,控制器方法返回并且请求完成。然后框架清理注入的服务。如果我们的后台任务足够慢或延迟,它可能会尝试调用已处理服务的方法,然后会遇到错误。

为了避免这种情况,我们排队的任务应该始终创建自己的服务范围,并且不应该使用来自周围控制器的服务实例。


skj*_*ini 5

Microsoft已在https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1中记录了相同内容

它使用BackgroundTaskQueue来完成,该任务从Controller分配工作,并且该工作由从BackgroundService派生的QueueHostedService执行。


归档时间:

查看次数:

14651 次

最近记录:

6 年,4 月 前