NATS - 使用异步句柄函数订阅事件

niv*_*510 5 .net c# delegates asynchronous nats.io

我正在使用 NATS 机制在不同进程之间发布事件,当我订阅一个主题并提供处理该主题的异步操作时,如果处理该主题的操作抛出异常,我无法捕获它,因为我不这样做等待调用。

问题是“connection.subscribeAsync”参数如下:

delegate void EventHandler<TEventArgs>(object sender, TEventArgs e);

我使用自定义属性来确定该操作是异步操作还是同步操作

如果是异步的,我使用反射从 Action 构建任务;

但在这一点上,看起来我必须在两个糟糕的选择之间做出选择,

第一种是使用 GetAwaiter().GetResults();

第二个是执行方法 - async void 并用 try 和 catch 包装调用,这样我将捕获在同步上下文之前发生的异常。

我很想听到任何解决这些问题的建议。

代码快照:

async void MsgEvent(object sender, EncodedMessageEventArgs eArgs)
{
    try
    {
        if (IsAsyncAction(request.Action))
        {
            await (Task)request.Action.GetMethodInfo()
                    .Invoke(request.Action, new object[] { eArgs });

        }
        .
        .
        .

    }
    catch (Exception e)
    {
        _logger.LogError(e.Message);
    }
}

var subscription = connection.SubscribeAsync(request.Topic, MsgEvent);

    /// <summary>
    /// Expresses interest in the given <paramref name="subject" /> to the NATS Server, and begins delivering
    /// messages to the given event handler.
    /// </summary>
    /// <remarks>The <see cref="T:NATS.Client.IAsyncSubscription" /> returned will start delivering messages
    /// to the event handler as soon as they are received. The caller does not have to invoke
    /// <see cref="M:NATS.Client.IAsyncSubscription.Start" />.</remarks>
    /// <param name="subject">The subject on which to listen for messages.
    /// The subject can have wildcards (partial: <c>*</c>, full: <c>&gt;</c>).</param>
    /// <param name="handler">The <see cref="T:System.EventHandler`1" /> invoked when messages are received
    /// on the returned <see cref="T:NATS.Client.IAsyncSubscription" />.</param>
    /// <returns>An <see cref="T:NATS.Client.IAsyncSubscription" /> to use to read any messages received
    /// from the NATS Server on the given <paramref name="subject" />.</returns>
    /// <seealso cref="P:NATS.Client.ISubscription.Subject" />
    IAsyncSubscription SubscribeAsync(string subject, EventHandler<EncodedMessageEventArgs> handler);

Run Code Online (Sandbox Code Playgroud)

Ste*_*ary 3

问题是 ...

是的。核心问题是该库需要同步处理程序(void返回类型)并且不理解/正确处理异步处理程序(Task返回类型)。

首先,我会要求库作者允许异步处理程序。

我使用自定义属性来确定该操作是异步操作还是同步操作

这可能有点危险。如果您的意思是您有自己的属性类型,那么当然,只要您记得将其放在所有处理程序上,那就可以。代码分析器会很有帮助。如果您的意思是您正在检测方法的编译器属性async,那么我建议您不要这样做;如果编译器属性名称发生更改,或者处理程序本身没有更改async但调用了async方法,它就会中断。

看来我必须在两个糟糕的选择之间做出选择

是的。当您被迫进行异步同步时,您总是会在不好的选项之间进行选择。

第一种是使用 GetAwaiter().GetResults();

这是一种方法(“直接阻止黑客”)。有关其他一些方法,请参阅我关于棕地异步的文章。

第二个是执行方法 - async void 并用 try 和 catch 包装调用,这样我将捕获在同步上下文之前发生的异常。

那是行不通的。另一种方法是在每个方法添加顶级try/ 。您无法包装调用,因为异常已被状态机捕获并传递到同步上下文。它必须被捕获方法中。catch async voidasync voidasync void

可以定义一个SubscribeAsync带有处理程序的扩展方法Func<EncodedMessageEventArgs, Task>;您可以在lambda 中提供顶级try/并将其传递给原始.catchasync voidSubscribeAsync