kei*_*ala 3 c# mqtt asp.net-core mqttnet
在我的 ASP.Net Core 6 应用程序中,名为 MqttClientService 的 BackgroundService 任务运行一个 MQTTNet 客户端,该客户端处理传入的 mqqt 消息并用消息进行响应以指示它已成功。
我已经从 MQTTNet 存储库获取了示例控制台应用程序来使用Console.ReadLine(),但这对于我的用例来说感觉就像是一种黑客攻击。有没有更好的方法来让BackgroundService处理传入的消息而不需要不断地重新启动?
有一个使用 Asp.Net Core 和 MQTTNet 版本 3 的示例,但它使用由接口实现的句柄,而不是库现在使用的异步事件:MQTTNet 的升级指南。
任何信息将不胜感激,谢谢。
Services/ 中的 MqttClientService.cs
using MQTTnet;
using MQTTnet.Client;
using System.Text;
namespace MqttClientAspNetCore.Services
{
public class MqttClientService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await Handle_Received_Application_Message();
}
}
public static async Task Handle_Received_Application_Message()
{
var mqttFactory = new MqttFactory();
using (var mqttClient = mqttFactory.CreateMqttClient())
{
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("test.mosquitto.org")
.Build();
// Setup message handling before connecting so that queued messages
// are also handled properly.
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
// Publish successful message in response
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("keipalatest/1/resp")
.WithPayload("OK")
.Build();
mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
Console.WriteLine("MQTT application message is published.");
return Task.CompletedTask;
};
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f =>
{
f.WithTopic("keipalatest/1/post");
f.WithAtLeastOnceQoS();
})
.Build();
await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
Console.WriteLine("MQTT client subscribed to topic.");
// The line below feels like a hack to keep background service from restarting
Console.ReadLine();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
程序.cs
using MqttClientAspNetCore.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddHostedService<MqttClientService>();
var app = builder.Build();
// To check if web server is still responsive
app.MapGet("/", () =>
{
return "Hello World";
});
app.Run();
Run Code Online (Sandbox Code Playgroud)
不需要 Console.ReadLine 甚至循环。当 ExecuteAsync 返回时, BackgroundService 应用程序代码不会终止。如果您希望应用程序在终止时终止,ExecuteAsync则必须通过IApplicationLifecycle界面实际告诉它。
当我第一次尝试使用通用主机作为命令行工具时,我发现这一点很困难。这似乎永远挂着......
ExecuteAsync可用于设置 MQTT 客户端和事件处理程序并让它们工作。该代码仅在StopAsync被调用时终止。即使如此,这也是通过发出取消令牌信号来完成的,而不是通过中止某些工作线程来完成。
客户端本身可以在构造函数中构建,例如使用配置设置。只ConnectAsync需要调用即可ExecuteAsync。
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _client.ConnectAsync(_clientOptions, CancellationToken.None);
_logger.LogInformation("Connected");
await _client.SubscribeAsync(_subscriptionOptions, CancellationToken.None);
_logger.LogInformation("Subscribed");
}
Run Code Online (Sandbox Code Playgroud)
当调用StopAsync并触发取消令牌时,服务代码将停止。stoppingToken.Register可以用于_client.DisconnectAsync在发生这种情况时进行调用,但Register不接受异步委托。更好的选择是覆盖StopAsync自身:
public virtual async Task StopAsync(CancellationToken cancellationToken)
{
await _client.DisconnectAsync();
await base.StopAsync(cancellationToken);
}
Run Code Online (Sandbox Code Playgroud)
构造函数可以创建客户端并注册消息处理程序
public class MqttClientService : BackgroundService
{
ILogger<MqttClientService> _logger;
IMqttClient _client=client;
MqttClientOptions _clientOptions;
MqttSubscriptionOptions _subscriptionOptions;
string _topic;
public MqttClientService(IOptions<MyMqttOptions> options,
ILogger<MqttClientService> logger)
{
_logger=logger;
_topic=options.Value.Topic;
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(options.Value.Address)
.Build();
_subscriptionOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f =>
{
f.WithTopic(options.Value.Topic);
f.WithAtLeastOnceQoS();
})
.Build();
_client.ApplicationMessageReceivedAsync += HandleMessageAsync;
}
Run Code Online (Sandbox Code Playgroud)
接收到的消息由以下方法处理HandleMessageAsync:
async Task HandleMessageAsync(ApplicationMessageProcessedEventArgs e)
{
var payload=Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
_logger.LogInformation("### RECEIVED APPLICATION MESSAGE ###\n{payload}",payload);
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(_topic)
.WithPayload("OK")
.Build();
await _client.PublishAsync(applicationMessage, CancellationToken.None);
_logger.LogInformation("MQTT application message is published.");
}
Run Code Online (Sandbox Code Playgroud)
最后,由于BackgroundService实现了IDisposable,我们可以使用Dispose来处理_client实例:
public void Dispose()
{
Dispose(true);
}
protected virtual Dispose(bool disposing)
{
if(disposing)
{
_client.Dispose();
base.Dispose();
}
_client=null;
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2313 次 |
| 最近记录: |