.NET Core 2.1中的通用主机正常关闭

Gra*_*ver 10 .net c# .net-core asp.net-core

.NET Core 2.1引入了新的通用主机,该通用主机可以承载非HTTP工作负载以及Web主机的所有优点。当前,没有太多的信息和食谱,但是我以以下文章作为起点:

https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/host/generic-host?view=aspnetcore-2.1

https://docs.microsoft.com/zh-cn/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-2.1

https://docs.microsoft.com/zh-cn/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/background-tasks-with-ihostedservice

我的.NET Core应用程序启动,通过RabbitMQ消息代理侦听新请求,然后根据用户请求(通常通过控制台中的Ctrl + C)关闭。但是,关机并不是正常的-应用程序在将控制权返回给OS时仍具有未完成的后台线程。我通过控制台消息看到它-当我在控制台中按Ctrl + C时,我从应用程序中看到几行控制台输出,然后是OS命令提示符,然后又是我的应用程序中的控制台输出。

这是我的代码:

Program.cs

public class Program
{
    public static async Task Main(string[] args)
    {
        var host = new HostBuilder()
            .ConfigureHostConfiguration(config =>
            {
                config.SetBasePath(AppContext.BaseDirectory);
                config.AddEnvironmentVariables(prefix: "ASPNETCORE_");
                config.AddJsonFile("hostsettings.json", optional: true);
            })
            .ConfigureAppConfiguration((context, config) =>
            {
                var env = context.HostingEnvironment;
                config.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
                config.AddJsonFile($"appsettings.{env.EnvironmentName}.json", optional: true, reloadOnChange: true);
                if (env.IsProduction())
                    config.AddDockerSecrets();
                config.AddEnvironmentVariables();
            })
            .ConfigureServices((context, services) =>
            {
                services.AddLogging();
                services.AddHostedService<WorkerPoolHostedService>();
                // ... other services
            })
            .ConfigureLogging((context, logging) =>
            {
                if (context.HostingEnvironment.IsDevelopment())
                    logging.AddDebug();

                logging.AddSerilog(dispose: true);

                Log.Logger = new LoggerConfiguration()
                    .ReadFrom.Configuration(context.Configuration)
                    .CreateLogger();
            })
            .UseConsoleLifetime()
            .Build();

        await host.RunAsync();
    }
}
Run Code Online (Sandbox Code Playgroud)

WorkerPoolHostedService.cs

internal class WorkerPoolHostedService : IHostedService
{
    private IList<VideoProcessingWorker> _workers;
    private CancellationTokenSource _stoppingCts = new CancellationTokenSource();

    protected WorkerPoolConfiguration WorkerPoolConfiguration { get; }
    protected RabbitMqConfiguration RabbitMqConfiguration { get; }
    protected IServiceProvider ServiceProvider { get; }
    protected ILogger<WorkerPoolHostedService> Logger { get; }

    public WorkerPoolHostedService(
        IConfiguration configuration,
        IServiceProvider serviceProvider,
        ILogger<WorkerPoolHostedService> logger)
    {
        this.WorkerPoolConfiguration = new WorkerPoolConfiguration(configuration);
        this.RabbitMqConfiguration = new RabbitMqConfiguration(configuration);
        this.ServiceProvider = serviceProvider;
        this.Logger = logger;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        var connectionFactory = new ConnectionFactory
        {
            AutomaticRecoveryEnabled = true,
            UserName = this.RabbitMqConfiguration.Username,
            Password = this.RabbitMqConfiguration.Password,
            HostName = this.RabbitMqConfiguration.Hostname,
            Port = this.RabbitMqConfiguration.Port,
            VirtualHost = this.RabbitMqConfiguration.VirtualHost
        };

        _workers = Enumerable.Range(0, this.WorkerPoolConfiguration.WorkerCount)
            .Select(i => new VideoProcessingWorker(
                connectionFactory: connectionFactory,
                serviceScopeFactory: this.ServiceProvider.GetRequiredService<IServiceScopeFactory>(),
                logger: this.ServiceProvider.GetRequiredService<ILogger<VideoProcessingWorker>>(),
                cancellationToken: _stoppingCts.Token))
            .ToList();

        this.Logger.LogInformation("Worker pool started with {0} workers.", this.WorkerPoolConfiguration.WorkerCount);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        this.Logger.LogInformation("Stopping working pool...");

        try
        {
            _stoppingCts.Cancel();
            await Task.WhenAll(_workers.SelectMany(w => w.ActiveTasks).ToArray());
        }
        catch (AggregateException ae)
        {
            ae.Handle((Exception exc) =>
            {
                this.Logger.LogError(exc, "Error while cancelling workers");
                return true;
            });
        }
        finally
        {
            if (_workers != null)
            {
                foreach (var worker in _workers)
                    worker.Dispose();
                _workers = null;
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

VideoProcessingWorker.cs

internal class VideoProcessingWorker : IDisposable
{
    private readonly Guid _id = Guid.NewGuid();
    private bool _disposed = false;

    protected IConnection Connection { get; }
    protected IModel Channel { get; }
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected ILogger<VideoProcessingWorker> Logger { get; }
    protected CancellationToken CancellationToken { get; }

    public VideoProcessingWorker(
        IConnectionFactory connectionFactory,
        IServiceScopeFactory serviceScopeFactory,
        ILogger<VideoProcessingWorker> logger,
        CancellationToken cancellationToken)
    {
        this.Connection = connectionFactory.CreateConnection();
        this.Channel = this.Connection.CreateModel();
        this.Channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
        this.ServiceScopeFactory = serviceScopeFactory;
        this.Logger = logger;
        this.CancellationToken = cancellationToken;

        #region [ Declare ]

        // ...

        #endregion

        #region [ Consume ]

        // ...

        #endregion
    }

    // ... worker logic ...

    public void Dispose()
    {
        if (!_disposed)
        {
            this.Channel.Close(200, "Goodbye");
            this.Channel.Dispose();
            this.Connection.Close();
            this.Connection.Dispose();
            this.Logger.LogDebug("Worker {0}: disposed.", _id);
        }
        _disposed = true;
    }
}
Run Code Online (Sandbox Code Playgroud)

因此,当我按Ctrl + CI时,会在控制台中看到以下输出(没有请求处理时):

正在停止工作池...
命令提示符
工作者ID:已处置。

如何正常关机?

Gab*_*sch 8

你需要IApplicationLifetime。这为您提供了有关应用程序启动和关闭的所有必需信息。您甚至可以通过它触发关闭appLifetime.StopApplication();

看看https://github.com/aspnet/Docs/blob/66916c2ed3874ed9b000dfd1cab53ef68e84a0f7/aspnetcore/fundamentals/host/generic-host/samples/2.x/GenericHostSample/LifetimeEventsHostedService.cs

片段(如果链接无效):

public Task StartAsync(CancellationToken cancellationToken)
{
    appLifetime.ApplicationStarted.Register(OnStarted);
    appLifetime.ApplicationStopping.Register(OnStopping);
    appLifetime.ApplicationStopped.Register(OnStopped);

    return Task.CompletedTask;
}
Run Code Online (Sandbox Code Playgroud)

  • 谢谢,我已经阅读了有关此界面及其事件的信息。但是不是所有的关闭逻辑都应该在 IHostedService.StopAsync 中吗?当所有托管服务的 StopAsync 方法完成时,主机是否应该等待? (4认同)
  • 你是对的,我应该更仔细地阅读你的问题。我认为问题可能出在你最后的尝试上。您是否尝试过在finally中等待完成的任务(使用Task.Delay(Timeout.Infinite,cancelToken))? (3认同)
  • 不,不幸的是这没有帮助。当我在finally部分添加Task.Delay并尝试使用Ctrl+C停止主机时,它会打印“停止工作池”,然后出现命令提示符,然后挂起几秒钟并因OperationCanceledException而崩溃。我认为当主机不等待正常关闭并且即将开始“硬”关闭时,StopAsync 方法中的cancelToken 参数被取消。 (2认同)
  • 抱歉我的回答迟了。我没有收到通知。Task.Delay 抛出 OperationCanceledException 是预期的行为。只需捕获并处理OperationCanceledException。 (2认同)

Tim*_*ace 7

我将分享一些我认为非常适用于非 WebHost 项目的模式。

namespace MyNamespace
{
    public class MyService : BackgroundService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly IApplicationLifetime _appLifetime;

        public MyService(
            IServiceProvider serviceProvider,
            IApplicationLifetime appLifetime)
        {
            _serviceProvider = serviceProvider;
            _appLifetime = appLifetime;
        }

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _appLifetime.ApplicationStopped.Register(OnStopped);

            return RunAsync(stoppingToken);
        }

        private async Task RunAsync(CancellationToken token)
        {
            while (!token.IsCancellationRequested)
            {
                using (var scope = _serviceProvider.CreateScope())
                {
                    var runner = scope.ServiceProvider.GetRequiredService<IMyJobRunner>();
                    await runner.RunAsync();
                }
            }
        }

        public void OnStopped()
        {
            Log.Information("Window will close automatically in 20 seconds.");
            Task.Delay(20000).GetAwaiter().GetResult();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

关于这个类的一些注意事项:

  1. 我正在使用 BackgroundService 抽象类来表示我的服务。它在 Microsoft.Extensions.Hosting.Abstractions 包中可用。我相信这计划在 .NET Core 3.0 中开箱即用。
  2. ExecuteAsync 方法需要返回一个代表正在运行的服务的 Task 。注意:如果您有一个同步服务,请在 Task.Run() 中包装您的“Run”方法。
  3. 如果你想为你的服务做额外的设置或拆卸,你可以注入应用程序生命周期服务并挂钩事件。我添加了一个要在服务完全停止后触发的事件。
  4. 因为您没有像在 MVC 项目中那样为每个 Web 请求创建新范围的自动魔法,所以您必须为范围服务创建自己的范围。将 IServiceProvider 注入服务来做到这一点。应使用 AddScoped() 将作用域上的所有依赖项添加到 DI 容器中。

在 Main( string[] args ) 中设置主机,以便在调用 CTRL+C / SIGTERM 时正常关闭:

IHost host = new HostBuilder()
    .ConfigureServices( ( hostContext, services ) =>
    {
        services.AddHostedService<MyService>();
    })
    .UseConsoleLifetime()
    .Build();

host.Run();  // use RunAsync() if you have access to async Main()
Run Code Online (Sandbox Code Playgroud)

我发现这组模式在 ASP.NET 应用程序之外工作得很好。

请注意,Microsoft 已针对 .NET Standard 进行构建,因此您无需使用 .NET Core 即可利用这些新的便利。如果您在 Framework 中工作,只需添加相关的 NuGet 包。该包是针对 .NET Standard 2.0 构建的,因此您需要使用 Framework 4.6.1 或更高版本。您可以在此处找到所有基础设施的代码,并随意浏览您正在使用的所有抽象的实现:https : //github.com/aspnet/Extensions