如何在 ASP.NET Core 应用程序中连续侦听 Pub/Sub 消息?

Mar*_*cze 5 c# google-cloud-platform google-cloud-pubsub asp.net-core

我想实现一个 ASP.NET Core API,它不响应 HTTP 请求,但在启动时开始侦听 Google Cloud Pub/Sub 消息,并且在其整个生命周期中无限期地侦听。

使用官方发布/订阅 SDK 实现此功能的首选方法是什么?

我可以想到两种方法:

方法 1:只需使用SimpleSubscriber, 并在Startup.Configure开始收听消息时:

public void Configure(IApplicationBuilder app)
{
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
    var receivedMessages = new List<PubsubMessage>();

    simpleSubscriber.StartAsync((msg, cancellationToken) =>
    {
        // Process the message here.

        return Task.FromResult(SimpleSubscriber.Reply.Ack);
    });

    ...
}
Run Code Online (Sandbox Code Playgroud)

方法二:使用专门创建的库来定期运行作业,例如 Quartz、Hangfire 或 FluentScheduler,每次触发作业时,使用SubscriberClient.

哪一种是首选方法?第一个看起来更简单,但我不确定它是否真的可靠。

Chr*_*ris 4

第一种方法绝对是它的用途。

但是,请参阅以下文档StartAsync

开始接收消息。当调用或者发生不可恢复的错误时,返回Task完成 。每个 实例不能多次调用此方法。StopAsync(CancellationToken)SubscriberClient

因此,您确实需要处理不可StartAsync恢复错误导致的意外关闭。最简单的方法是使用外部循环,尽管考虑到这些错误被认为是不可恢复的,但在成功之前可能需要更改调用的某些内容。

代码可能如下所示:

while (true)
{
    // Each SubscriberClientinstance must only be used once.
    var subscriberClient = await SubscriberClient.CreateAsync(subscriptionName);
    try
    {
        await subscriberClient.StartAsync((msg, cancellationToken) =>
        {
            // Process the message here.
            return Task.FromResult(SimpleSubscriber.Reply.Ack);
        });
    }
    catch (Exception e)
    {
        // Handle the unrecoverable error somehow...
    }
}
Run Code Online (Sandbox Code Playgroud)

如果这不能按预期工作,请告诉我们

编辑SimpleSubscriber已在库中重命名为SubscriberClient,因此答案已进行相应编辑。

  • 请注意,SimpleSubscriber 已重命名为 SubscriberClient。新的 GitHub 链接:https://github.com/googleapis/google-cloud-dotnet/blob/master/apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs (2认同)