c# - DbContext 在BackgroundService 中释放

Jam*_*mil 3 c# rabbitmq entity-framework-core asp.net-core asp.net-core-hosted-services

我有一个 WebAPI,它也应该从 RabbitMQ 接收消息。我使用了这个教程,因为我知道有时 IIS 喜欢终止长时间运行的任务(虽然还没有在服务器上测试它,也许它不起作用)。我有一个处理通过 RabbitMQ 接收的消息的服务。我遇到的第一个问题 - 我无法将它注入到BackgroundService类中,所以我使用了IServiceScopeFactory. 现在,我必须使用来自两个队列的消息,据我了解,最佳实践是为此使用两个通道。但处理是在一项服务中完成的。后台服务:

public class ConsumeRabbitMQHostedService : BackgroundService
{
    private IConnection _connection;
    private IModel _firstChannel;
    private IModel _secondChannel;
    private RabbitConfigSection _rabbitConfig;
    public IServiceScopeFactory _serviceScopeFactory;

    public ConsumeRabbitMQHostedService(IOptions<RabbitConfigSection> rabbitConfig, IServiceScopeFactory serviceScopeFactory)
    {
        _rabbitConfig = rabbitConfig.Value;
        _serviceScopeFactory = serviceScopeFactory;
        InitRabbitMQ();
    }

    private void InitRabbitMQ()
    {
        var factory = new ConnectionFactory { HostName = _rabbitConfig.HostName, UserName = _rabbitConfig.UserName, Password = _rabbitConfig.Password };

        
        _connection = factory.CreateConnection();

        
        _firstChannel = _connection.CreateModel();

        _firstChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
        _firstChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, true, false, false, null);
        _firstChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
        _firstChannel.BasicQos(0, 1, false);

        _secondChannel = _connection.CreateModel();

        _secondChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
        _secondChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, true, false, false, null);
        _secondChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
        _secondChannel.BasicQos(0, 1, false);

        _connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
    }
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();

        var firstConsumer = new EventingBasicConsumer(_firstChannel);
        var secondConsumer = new EventingBasicConsumer(_secondChannel);
        using (var scope = _serviceScopeFactory.CreateScope())
        {
            IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
            firstConsumer.Received += (ch, ea) =>
            {
                // received message  
                var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());

                // handle the received message  
                HandleFirstMessage(content, scoped);
                _firstChannel.BasicAck(ea.DeliveryTag, false);

            };
            firstConsumer.Shutdown += OnConsumerShutdown;
            firstConsumer.Registered += OnConsumerRegistered;
            firstConsumer.Unregistered += OnConsumerUnregistered;
            firstConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
            _firstChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, false, firstConsumer);
        }
        using (var scope = _serviceScopeFactory.CreateScope())
        {
            IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
            secondConsumer.Received += (ch, ea) =>
            {
                // received message  

                var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());

                // handle the received message  
                HandleSecondMessage(content, scoped);
                _secondChannel.BasicAck(ea.DeliveryTag, false);
            };


            secondConsumer.Shutdown += OnConsumerShutdown;
            secondConsumer.Registered += OnConsumerRegistered;
            secondConsumer.Unregistered += OnConsumerUnregistered;
            secondConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;

            _secondChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, false, secondConsumer);
        }
        return Task.CompletedTask;
    }

    private void HandleFirstMessage(string content, IIntegrationService integrationService)
    {
        List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
        integrationService.ImportFirst(dataToImport);
    }

    private void HandleSecondMessage(string content, IIntegrationService integrationService)
    {
        List<Import901Data> importData = JsonConvert.DeserializeObject<List<Import901Data>>(content);
        integrationService.ImportSecond(importData);
    }

    private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
    private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
    private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

    public override void Dispose()
    {
        _firstChannel.Close();
        _connection.Close();
        base.Dispose();
    }
}
Run Code Online (Sandbox Code Playgroud)

在服务中我得到

System.ObjectDisposeException:“无法访问已处置的上下文实例。导致此错误的一个常见原因是处置从依赖项注入解析的上下文实例,然后尝试在应用程序的其他位置使用相同的上下文实例。如果您在上下文实例上调用“Dispose”或将其包装在 using 语句中,则可能会发生这种情况。如果您使用依赖项注入,则应该让依赖项注入容器负责处理上下文实例。对象名称:“IntegrationDbContext”。

DbContext被注入到IIntegrationService. 如果我了解发生了什么,服务的两个实例(甚至一个)共享DbContext,当其中一个实例完成时,它会进行处置DbContext。我尝试不创建两个实例(一个实例中的所有代码using),尝试制作IIntegrationService瞬态,尝试异步执行所有操作(这是初始版本,使其同步测试)-仍然是相同的错误。我应该在这里做什么?这是正确的方法吗?

更新 1. ConfigureServicesStartup

        public void ConfigureServices(IServiceCollection services)
    {
        var rabbitConfigSection =
            Configuration.GetSection("Rabbit");
        services.Configure<RabbitConfigSection>(rabbitConfigSection);
        services.AddDbContext<SUNDbContext>(options =>
               options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));

        services.AddCors();
        services.AddSwaggerGen(c =>
        {
            c.SwaggerDoc("v1", new OpenApiInfo
            {
                Title = "My API",
                Version = "v1"
            });
        });
        services.AddRabbit(Configuration);
        services.AddHostedService<ConsumeRabbitMQHostedService>();
        services.AddControllers();
        services.AddTransient<IIntegrationService, IntegrationService>();// it's transient now, same error with scoped
    }
Run Code Online (Sandbox Code Playgroud)

Dav*_*d L 6

该问题是由于以下事实引起的:由scope创建的外部_serviceScopeFactory.CreateScope()在每个 using 语句之后被释放,而每个消息仍然试图依赖现在释放的范围和附加的上下文来处理消息。

解决方案是在消息处理程序中为每条消息创建一个新范围:

private void HandleFirstMessage(string content)
{
    using (var scope = _serviceScopeFactory.CreateScope())
    {
        IIntegrationService integrationService = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
        List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
        integrationService.ImportFirst(dataToImport);
    }
}
Run Code Online (Sandbox Code Playgroud)