.Net Core:“范围”依赖注入的自定义范围。一个控制器

Nix*_*xon 8 scope dependency-injection message-queue amqp .net-core

我有一个应用程序,它不通过控制器接收普通 HTTP 请求,而是侦听并接收消息(AMQP 协议)以启动其逻辑流。

我的应用程序一次可能会接收和处理多于 1 条消息。我有一个对象,它将在整个过程中在几个不同的服务/类中收集信息/数据,以便我最终使用它。但我需要根据收到的消息将数据分开,因为“范围”注入会将注入的实例与其他 HTTP 请求分开。

因此,我的用例与我在普通 API 中使用 Scoped 注入对象的方式非常相似,但我在侦听器中收到一条消息,而不是新的 HTTP 请求。

有什么方法可以为收到的每条消息创建一个自定义范围,无论是通过某种配置,还是让代码创建一个新范围作为我的 Listener.MessageReceived(Message message) 方法中的第一件事?

想象一下这样的流程:

public class Listener {
    ServiceClassA serviceClassA //injected in constructor
    CustomLogger customLogger // (HAS TO BE SAME OBJECT INJECTED INTO ServiceClassA, ServiceClassB and Listener)

    public void ReceiveMessage(Message message) {
        using (var scope = CreateNewScope()) {
            try {
                serviceClassA.DoStuff();
            } catch(Exception e) {
                Console.Write(customLogger.GetLogs())
            }
        }
    }
}


public class ServiceClassA {
    ServiceClassB serviceClassB //injected in constructor
    CustomLogger customLogger //(HAS TO BE SAME OBJECT INJECTED INTO ServiceClassA, ServiceClassB and Listener)

    public void DoStuff() {
        customLogger = ResolveCustomLogger(); // how do I make sure I can get/resolve the same object as in Listener (without having to pass parameters)
        var data = // does stuff
        customLogger.Log(data);

        serviceClassB.DoStuff();
    }
}


public class ServiceClassB {
    CustomLogger customLogger //(HAS TO BE SAME OBJECT INJECTED INTO ServiceClassA, ServiceClassB and Listener)

    public void DoStuff() {
        customLogger = ResolveCustomLogger(); // how do I make sure I can get/resolve the same object as in Listener (without having to pass parameters)
        var data = // does other stuff
        customLogger.Log(data);
    }
}
Run Code Online (Sandbox Code Playgroud)

我的 CustomLogger 可能不仅在 1 或 2 个服务层以下使用,可能有很多层,我可能只想在底部使用 CustomLogger,但我希望之后可以在顶层访问它,以检索存储的数据在里面。

非常感谢。

Jor*_*elt 7

您可以在类中注入一个 ServiceScopyFactory 来对队列中的消息做出反应,然后对于它收到的每条消息,它可以创建一个作用域,从中请求 MessageHandler 依赖项。

下面的代码示例正是这样做的(它还处理队列上的会话,但这对于创建作用域应该没有什么区别)。

public class SessionHandler : ISessionHandler
{
    public readonly string SessionId;
    private readonly ILogger<SessionHandler> Logger;
    private readonly IServiceScopeFactory ServiceScopeFactory;

    readonly SessionState SessionState;

    public SessionHandler(
        ILogger<SessionHandler> logger,
        IServiceScopeFactory serviceScopeFactory,
        string sessionId)
    {
        Logger = logger;
        ServiceScopeFactory = serviceScopeFactory;
        SessionId = sessionId
        SessionState = new SessionState();
    }

    public async Task HandleMessage(IMessageSession session, Message message, CancellationToken cancellationToken)
    {
        Logger.LogInformation($"Message of {message.Body.Length} bytes received.");


        // Deserialize message
        bool deserializationSuccess = TryDeserializeMessageBody(message.Body, out var incomingMessage);

        if (!deserializationSuccess)
            throw new NotImplementedException(); // Move to deadletter queue?


        // Dispatch message
        bool handlingSuccess = await HandleMessageWithScopedHandler(incomingMessage, cancellationToken);

        if (!handlingSuccess)
            throw new NotImplementedException(); // Move to deadletter queue?
    }

    /// <summary>
    /// Instantiate a message handler with a service scope that lasts until the message handling is done.
    /// </summary>
    private async Task<bool> HandleMessageWithScopedHandler(IncomingMessage incomingMessage, CancellationToken cancellationToken)
    {
        try
        {
            using IServiceScope messageHandlerScope = ServiceScopeFactory.CreateScope();
            var messageHandlerFactory = messageHandlerScope.ServiceProvider.GetRequiredService<IMessageHandlerFactory>();
            var messageHandler = messageHandlerFactory.Create(SessionState);

            await messageHandler.HandleMessage(incomingMessage, cancellationToken);

            return true;
        }
        catch (Exception exception)
        {
            Logger.LogError(exception, $"An exception occurred when handling a message: {exception.Message}.");
            return false;
        }
    }

    private bool TryDeserializeMessageBody(byte[] body, out IncomingMessage? incomingMessage)
    {
        incomingMessage = null;

        try
        {
            incomingMessage = IncomingMessage.Deserialize(body);
            return true;
        }
        catch (MessageDeserializationException exception)
        {
            Logger.LogError(exception, exception.Message);    
        }

        return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,每当实例化 MessageHandlerFactory(从队列接收到的每条消息都会发生这种情况)时,工厂请求的任何作用域依赖项都将一直存在,直到 MessageHandler.HandleMessage() 任务完成。

我创建了一个消息处理程序工厂,以便 SessionHandler 除了 DI 服务之外还可以将非 DI 服务参数传递给 MessageHandler 的构造函数(在本例中为 SessionState 对象)。工厂请求(范围内的)依赖项并将它们传递给 MessageHandler。如果您不使用会话,那么您可能不需要工厂,您可以直接从范围中获取 MessageHandler。