通过回调消息代理将推送公开为 IAsyncEnumerable

all*_*ran 1 c# asynccallback iasyncenumerable system.threading.channels

我正在使用一个第三方库,它充当发布-订阅消息代理的接口。经纪人是 Solace PubSub+。

对于订阅者,供应商库采用“通过回调推送消息”模式。

我正在围绕供应商库编写一个自己的包装器库,以便其他开发人员更容易使用(隐藏库与网络通信方式的所有内部结构等)。

同样,我认为将订户提要公开为 可能会有所帮助IAsyncEnumerable,并且我认为这可能是 的一个很好的用例System.Threading.Channels。我有两个担忧:

  1. 渠道在这里是否合适,或者我是否过度设计了?即,是否有更“C# 惯用”的方式来包装回调?
  2. 我的包装器实现安全吗EnumerableBroker,还是我在某个地方陷入了异步陷阱?

我意识到第一个问题可能比 SO 更适合 CodeReview,但由于该问题的答案也与第二个问题相关,因此将它们放在一起似乎是合适的。值得注意的是:我正在避免使用IObservable/Rx,因为我的目标是让我的界面比供应商的界面基础,而不是要求其他开发人员和我自己学习 Rx!对于中间的通道,理解生产者和消费者进程如何独立也是微不足道的,而对于可观察的,我的第一个心理过程是“好吧,那么生产者和消费者仍然是独立的吗?乍一看,我必须现在就了解调度程序...天哪,我只使用一个怎么样await foreach?”

这是消费消息的最小模型,没有EnumerableBroker

// mockup of third party class
private class Broker
{
    // mockup of how the third party library pushes messages via callback
    public void Subscribe(EventHandler<int> handler) => this.handler = handler;

    //simulate the broker pushing messages. Not "real" code
    public void Start()
    {
        Task.Run
        (
            () =>
            {
                for (int i = 0; !cts.Token.IsCancellationRequested; i++)
                {
                    // simulate internal latency
                    Thread.Sleep(10);
                    handler?.Invoke(this, i);
                }
            }, cts.Token
        );
    }

    public void Stop() => cts.Cancel();

    private CancellationTokenSource cts = new();
    private EventHandler<int> handler;
}

private static async Task Main()
{
    var broker = new Broker();
    broker.Subscribe((_, msg) => Console.WriteLine(msg));
    broker.Start();
    await Task.Delay(1000);
    broker.Stop();
}
Run Code Online (Sandbox Code Playgroud)

现在有一个最小的复制EnumerableBroker(仍然使用上面列出的相同的模拟Broker类)。这里至少有一个好处似乎是,如果订阅者需要做大量工作来处理消息,它不会占用代理的线程 - 至少在缓冲区填满之前是这样。这似乎没有错误,但我学会了对我对异步的有限掌握保持警惕。

private class EnumerableBroker
{
    public EnumerableBroker(int bufferSize = 8)
    {
        buffer = Channel.CreateBounded<int>
        (
            new BoundedChannelOptions(bufferSize) { SingleReader = true,
                SingleWriter = true }
        );
    }

    public IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
    {
        broker.Subscribe
        (
            // switched to sync per Theodor's comments
            (_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()
        );
        ct.Register(broker.Stop);
        broker.Start();
        return buffer.Reader.ReadAllAsync(ct);
    }

    private readonly Channel<int> buffer;
    private readonly Broker broker = new();
}

private static async Task Main()
{
    var cts = new CancellationTokenSource();
    var broker = new EnumerableBroker();
    cts.CancelAfter(1000);
    try
    {
        await foreach (var msg in broker.ReadAsync(cts.Token))
        {
            Console.WriteLine(msg);
        }
    }
    catch (OperationCanceledException) { }
}
Run Code Online (Sandbox Code Playgroud)

The*_*ias 5

我是不是过度设计了?

不会。AChannel正是您实现此功能所需的组件类型。这是一个非常简单的机制。它基本上是该类的异步版本BlockingCollection<T>,具有一些额外的功能(如Completion属性)和精美的 API(Reader外观Writer)。

我的 EnumerableBroker 包装器实现是否安全,或者我是否陷入了某个地方的异步陷阱?

是的,有一个陷阱,而你已经掉进去了。该SingleWriter = true配置意味着最多WriteAsync允许同时进行一项操作。在发出下一个之前WriteAsync,必须先完成上一个。broker通过使用委托订阅async void,您实质上是为代理推送的每条消息创建一个单独的编写器(生产者)。最有可能的是,该组件会通过抛出 s 或其他东西来抱怨这种误用InvalidOperationException。解决方案是切换到SingleWriter = falsethough。这将通过创建一个外部且效率极低的队列来规避 的有限容量,Channel其中的消息不适合 的内部队列Channel。解决方案是重新考虑您的缓冲策略。如果您无法缓冲无限数量的消息,则必须删除消息,或者抛出异常并杀死消费者。相反await buffer.Writer.WriteAsync,最好与 同步馈送通道,并在出现 的bool accepted = buffer.Writer.TryWrite情况下采取适当的操作。acceptedfalse

您应该记住的另一个考虑因素是该ChannelReader.ReadAllAsync方法非常消耗资源。这意味着,如果同一通道有多个读者/消费者,则每条消息将仅传递给其中一个消费者。换句话说,每个消费者将收到通道消息的部分子集。IAsyncEnumerable<T>您应该将这一点传达给您的同事,因为多次枚举相同内容是非常简单的。毕竟 anIAsyncEnumerable<T>无非是IAsyncEnumerator<T>s 的一个工厂。

最后,您不必通过 a 控制每个订阅的生命周期CancellationToken,只需在 an 枚举终止时自动终止订阅,即可让同事的生活变得更轻松IAsyncEnumerator<T>。当await foreach循环以任何方式结束时(例如通过break或通过异常),关联的IAsyncEnumerator<T>将自动释放。如果 try/finally 块包装了屈服循环,C# 语言就巧妙地将DisposeAsync调用与finally迭代器块挂钩。您可以像这样利用这个强大的功能:

public async IAsyncEnumerable<int> ReadAsync(CancellationToken ct)
{
    broker.Subscribe
    (
        //...
    );
    broker.Start();
    try
    {
        await foreach (var msg in buffer.Reader.ReadAllAsync(ct))
        {
            yield return msg;
        }
    }
    finally
    {
        broker.Stop();
    }
}
Run Code Online (Sandbox Code Playgroud)