如何解决paho mqtt客户端的异步连接问题?

rai*_*san 2 java asynchronous mqtt paho

背景

我一直在为一个项目使用 MQTT,但遇到了一个奇怪的问题。我paho用作我的 MQTT 客户端和VerneMQ代理。

VerneMQ 代理服务已启动并正在运行,我可以通过 runnnig 确认这一点,netstat并且我可以看到该127.0.0.1:1883条目处于LISTENING模式中。

这是我的客户端代码:

public class Producer implements MqttCallback {

    private String brokerUri;
    private String clientId;

    public Producer(String brokerUri, String clientId){
        this.brokerUri = brokerUri;
        this.clientId = clientId;
    }

    public void doProduce(String topic, String payload){
        MemoryPersistence memoryPersistence = new MemoryPersistence();

        try {
            MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setCleanSession(true);
            mqttAsyncClient.setCallback(this);
            mqttAsyncClient.connect(mqttConnectOptions);
            MqttMessage mqttMessage = new MqttMessage();
            mqttMessage.setPayload(payload.getBytes());
            mqttAsyncClient.publish(topic, mqttMessage);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    public void connectionLost(Throwable throwable) {

    }


    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {

    }


    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("Message delivered!");
    }
}
Run Code Online (Sandbox Code Playgroud)

以下是我的主课

public class Main {
    public static void main(String[] args) {
        Producer producer = new Producer("tcp://127.0.0.1:1883", "producer1");
        producer.doProduce("dummyTopic", "dummyMessage");
    }
}
Run Code Online (Sandbox Code Playgroud)

问题

当我运行我的应用程序时,我Client is not connected (32104)在输出中看到异常。

如果我将行更改mqttAsyncClient.connect(mqttConnectOptions);mqttAsyncClient.connect(mqttConnectOptions).waitForCompletion();in Producerclass,我可以成功连接到代理,并且可以Message delivered!在输出中看到。

如果我没记错的话waitForCompletion()会阻塞呼叫直到收到响应。通过添加这一行,我有效地将 AsyncClient 连接更改为阻塞连接,这对我来说不是理想的方法。

如何解决此问题,以便 paho MQTT 客户端以非阻塞方式连接到代理?我在路上错过了什么吗?

har*_*llb 5

这包含在IMqttAsyncClient的文档中

 IMqttToken token method(parms, Object userContext, IMqttActionListener callback)
Run Code Online (Sandbox Code Playgroud)

在这种形式中,一个回调被注册到方法中。动作成功或失败时会通知回调。回调是在 MQTT 客户端管理的线程上调用的,因此在回调中最小化处理非常重要。否则将禁止 MQTT 客户端的操作。例如,在连接完成时收到通知(回调):

IMqttToken conToken;
  conToken = asyncClient.connect("some context", new MqttAsyncActionListener() {
      public void onSuccess(IMqttToken asyncActionToken) {
        log("Connected");
      }

      public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
        log ("connect failed" +exception);
      }
});
Run Code Online (Sandbox Code Playgroud)

可以将一个可选的上下文对象传递到该方法中,然后该方法将在回调中可用。上下文由 MQTT 客户端存储在令牌中,然后返回给调用者。令牌提供给回调方法,然后可以访问上下文。

所以你的 try/catch 块应该是这样的:

try {
  MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(brokerUri, clientId, memoryPersistence);
  MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  mqttConnectOptions.setCleanSession(true);
  mqttAsyncClient.setCallback(this);
  mqttAsyncClient.connect(mqttConnectOptions, null, new MqttAsyncActionListener() {
    public void onSuccess(IMqttToken asyncActionToken) {
      MqttMessage mqttMessage = new MqttMessage();
      mqttMessage.setPayload(payload.getBytes());
      mqttAsyncClient.publish(topic, mqttMessage);
    }

    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
      exception.printStackTrace();
    }
});

} catch (MqttException e) {
  e.printStackTrace();
}
Run Code Online (Sandbox Code Playgroud)