rai*_*san 2 java asynchronous mqtt paho
背景
我一直在为一个项目使用 MQTT,但遇到了一个奇怪的问题。我paho用作我的 MQTT 客户端和VerneMQ代理。
VerneMQ 代理服务已启动并正在运行,我可以通过 runnnig 确认这一点,netstat并且我可以看到该127.0.0.1:1883条目处于LISTENING模式中。
这是我的客户端代码:
public class Producer implements MqttCallback {
private String brokerUri;
private String clientId;
public Producer(String brokerUri, String clientId){
this.brokerUri = brokerUri;
this.clientId = clientId;
}
public void doProduce(String topic, String payload){
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void connectionLost(Throwable throwable) {
}
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("Message delivered!");
}
}
Run Code Online (Sandbox Code Playgroud)
以下是我的主课
public class Main {
public static void main(String[] args) {
Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
producer.doProduce("dummyTopic", "dummyMessage");
}
}
Run Code Online (Sandbox Code Playgroud)
问题
当我运行我的应用程序时,我Client is not connected (32104)在输出中看到异常。
如果我将行更改mqttAsyncClient.connect(mqttConnectOptions);为mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();in Producerclass,我可以成功连接到代理,并且可以Message delivered!在输出中看到。
如果我没记错的话waitForCompletion()会阻塞呼叫直到收到响应。通过添加这一行,我有效地将 AsyncClient 连接更改为阻塞连接,这对我来说不是理想的方法。
题
如何解决此问题,以便 paho MQTT 客户端以非阻塞方式连接到代理?我在路上错过了什么吗?
这包含在IMqttAsyncClient的文档中
Run Code Online (Sandbox Code Playgroud)IMqttToken token method(parms, Object userContext, IMqttActionListener callback)在这种形式中,一个回调被注册到方法中。动作成功或失败时会通知回调。回调是在 MQTT 客户端管理的线程上调用的,因此在回调中最小化处理非常重要。否则将禁止 MQTT 客户端的操作。例如,在连接完成时收到通知(回调):
Run Code Online (Sandbox Code Playgroud)IMqttToken conToken; conToken = asyncClient.connect("some context", new MqttAsyncActionListener() { public void onSuccess(IMqttToken asyncActionToken) { log("Connected"); } public void onFailure(IMqttToken asyncActionToken, Throwable exception) { log ("connect failed" +exception); } });可以将一个可选的上下文对象传递到该方法中,然后该方法将在回调中可用。上下文由 MQTT 客户端存储在令牌中,然后返回给调用者。令牌提供给回调方法,然后可以访问上下文。
所以你的 try/catch 块应该是这样的:
try {
MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(true);
mqttAsyncClient.setCallback(this);
mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload.getBytes());
mqttAsyncClient.publish(topic, mqttMessage);
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
exception.printStackTrace();
}
});
} catch (MqttException e) {
e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5051 次 |
| 最近记录: |