如何在没有Console.ReadLine()的情况下让BackgroundService任务接收mqtt消息

kei*_*ala 3 c# mqtt asp.net-core mqttnet

在我的 ASP.Net Core 6 应用程序中,名为 MqttClientService 的 BackgroundService 任务运行一个 MQTTNet 客户端,该客户端处理传入的 mqqt 消息并用消息进行响应以指示它已成功。

我已经从 MQTTNet 存储库获取了示例控制台应用程序来使用Console.ReadLine(),但这对于我的用例来说感觉就像是一种黑客攻击。有没有更好的方法来让BackgroundService处理传入的消息而不需要不断地重新启动?

一个使用 Asp.Net Core 和 MQTTNet 版本 3 的示例,但它使用由接口实现的句柄,而不是库现在使用的异步事件:MQTTNet 的升级指南

任何信息将不胜感激,谢谢。

Services/ 中的 MqttClientService.cs

using MQTTnet;
using MQTTnet.Client;
using System.Text;

namespace MqttClientAspNetCore.Services
{
    public class MqttClientService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await Handle_Received_Application_Message();
            }
        }

        public static async Task Handle_Received_Application_Message()
        {

            var mqttFactory = new MqttFactory();

            using (var mqttClient = mqttFactory.CreateMqttClient())
            {
                var mqttClientOptions = new MqttClientOptionsBuilder()
                    .WithTcpServer("test.mosquitto.org")
                    .Build();

                // Setup message handling before connecting so that queued messages
                // are also handled properly. 
                mqttClient.ApplicationMessageReceivedAsync += e =>
                {
                    Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
                    Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");

                    // Publish successful message in response
                    var applicationMessage = new MqttApplicationMessageBuilder()
                        .WithTopic("keipalatest/1/resp")
                        .WithPayload("OK")
                        .Build();

                    mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

                    Console.WriteLine("MQTT application message is published.");

                    return Task.CompletedTask;
                };

                await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

                var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
                    .WithTopicFilter(f =>
                    {
                        f.WithTopic("keipalatest/1/post");
                        f.WithAtLeastOnceQoS();
                    })
                    .Build();

                await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

                Console.WriteLine("MQTT client subscribed to topic.");
                // The line below feels like a hack to keep background service from restarting
                Console.ReadLine();
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

程序.cs

using MqttClientAspNetCore.Services;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddHostedService<MqttClientService>();

var app = builder.Build();

// To check if web server is still responsive
app.MapGet("/", () =>
{
    return "Hello World";
});


app.Run();
Run Code Online (Sandbox Code Playgroud)

Pan*_*vos 5

不需要 Console.ReadLine 甚至循环。当 ExecuteAsync 返回时, BackgroundService 应用程序代码不会终止。如果您希望应用程序在终止时终止,ExecuteAsync则必须通过IApplicationLifecycle界面实际告诉它。

当我第一次尝试使用通用主机作为命令行工具时,我发现这一点很困难。这似乎永远挂着......

ExecuteAsync可用于设置 MQTT 客户端和事件处理程序并让它们工作。该代码仅在StopAsync被调用时终止。即使如此,这也是通过发出取消令牌信号来完成的,而不是通过中止某些工作线程来完成。

客户端本身可以在构造函数中构建,例如使用配置设置。只ConnectAsync需要调用即可ExecuteAsync

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{

    await _client.ConnectAsync(_clientOptions, CancellationToken.None);
    _logger.LogInformation("Connected");

    await _client.SubscribeAsync(_subscriptionOptions, CancellationToken.None);
    _logger.LogInformation("Subscribed");
}
Run Code Online (Sandbox Code Playgroud)

当调用StopAsync并触发取消令牌时,服务代码将停止。stoppingToken.Register可以用于_client.DisconnectAsync在发生这种情况时进行调用,但Register不接受异步委托。更好的选择是覆盖StopAsync自身:

public virtual async Task StopAsync(CancellationToken cancellationToken)
{
    await _client.DisconnectAsync();
    await base.StopAsync(cancellationToken);
}
Run Code Online (Sandbox Code Playgroud)

构造函数可以创建客户端并注册消息处理程序

public class MqttClientService : BackgroundService
{
    ILogger<MqttClientService> _logger;
    IMqttClient _client=client;

    MqttClientOptions _clientOptions;
    MqttSubscriptionOptions _subscriptionOptions;    
    string _topic;

    public MqttClientService(IOptions<MyMqttOptions> options, 
                            ILogger<MqttClientService> logger)
    {
        _logger=logger;
        _topic=options.Value.Topic;
        var factory = new MqttFactory();
        _client = factory.CreateMqttClient();
        _clientOptions = new MqttClientOptionsBuilder()
                        .WithTcpServer(options.Value.Address)
                        .Build();
        _subscriptionOptions = factory.CreateSubscribeOptionsBuilder()
                    .WithTopicFilter(f =>
                    {
                        f.WithTopic(options.Value.Topic);
                        f.WithAtLeastOnceQoS();
                    })
                    .Build();
        _client.ApplicationMessageReceivedAsync += HandleMessageAsync;
    }
Run Code Online (Sandbox Code Playgroud)

接收到的消息由以下方法处理HandleMessageAsync

async Task HandleMessageAsync(ApplicationMessageProcessedEventArgs e)
{
    var payload=Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
    _logger.LogInformation("### RECEIVED APPLICATION MESSAGE ###\n{payload}",payload);
    var applicationMessage = new MqttApplicationMessageBuilder()
                    .WithTopic(_topic)
                    .WithPayload("OK")
                    .Build();

    await _client.PublishAsync(applicationMessage, CancellationToken.None);

    _logger.LogInformation("MQTT application message is published.");
}
Run Code Online (Sandbox Code Playgroud)

最后,由于BackgroundService实现了IDisposable,我们可以使用Dispose来处理_client实例:

public void Dispose()
{
    Dispose(true);
}

protected virtual Dispose(bool disposing)
{
    if(disposing)
    {
        _client.Dispose();
        base.Dispose();
    }
    _client=null;
}
Run Code Online (Sandbox Code Playgroud)