我正在尝试学习 MQTT 并且一直在玩它。我编写了一个用于发布的客户端和一个用于订阅的客户端(见下文)。
如果我运行订阅客户端,然后运行发布客户端(在订阅运行时),那么一切正常。我的订阅客户端正确接收发布到主题的消息。
但是,如果我先运行发布客户端(即我向主题发布消息),然后运行订阅客户端,则不会收到任何消息。
换句话说,如果我先与子客户端连接,然后在连接子客户端时与 pub 客户端发布消息,则一切正常。但是,如果我先发布消息,然后再连接到我的子客户端,则不会收到任何消息。我的理解是,一旦我与客户端连接并订阅该主题,我就应该收到该主题上存在的消息。
我发现了一个类似的问题:无法接收已发布的消息到 mqtt paho 上的订阅主题,尽管这种情况似乎有点不同。我尝试更改不同的 QoS 设置或 cleanSession 标志,但这并没有解决问题。
任何帮助,将不胜感激!
发布客户端:
public class MQTT_Client_Pub implements MqttCallback{
MqttClient client;
public static void main(String[] args) {
new MQTT_Client_Pub().mqttPub();
}
public void mqttPub(){
try {
this.setConnection();
// Connect
client.connect();
// Create new message
MqttMessage message = new MqttMessage();
message.setPayload("A single test message from b112358".getBytes());
message.setQos(0);
// Publish message to a topic
System.out.println("Publishing a message.");
client.publish("pahodemo/test/b112358", message);
// Disconnect
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
} catch (Exception e){
e.printStackTrace();
}
}
public void setConnection(){
// Client
try{
client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_pub");
} catch (MqttException e) {
e.printStackTrace();
}
// Connection Options
MqttConnectOptions options = new MqttConnectOptions();
// Set the will
options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);
// Set Callback
client.setCallback(this);
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("Message delivered to the broker.");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {}
public void connectionLost(Throwable cause) {}
Run Code Online (Sandbox Code Playgroud)
}
订阅客户端:
public class MQTT_Client_Sub implements MqttCallback{
MqttClient client;
public static void main(String[] args) {
new MQTT_Client_Sub().mqttSub();
}
public void mqttSub(){
try {
// Set connection
this.setConnection();
// Connect
client.connect();
// Subscribe
client.subscribe("pahodemo/test/b112358", 0);
// Disconnect
// client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
public void setConnection(){
try {
// Client
client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_sub");
} catch (MqttException e) {
e.printStackTrace();
}
// Connection Options
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
// Set the will
options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true);
client.setCallback(this);
}
public void deliveryComplete(IMqttDeliveryToken token) {}
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message Arrived: " + message.getPayload() + " on tipic: " + topic.getBytes());
}
public void connectionLost(Throwable cause) {}
Run Code Online (Sandbox Code Playgroud)
}
在订阅者连接和订阅之前发布的消息只会在以下两种情况下传递
当消息被发布为保留。这意味着该主题的最后一条消息将在订阅时传递给新订阅者。这只会传递最后一条消息。
如果客户端先前已连接并订阅,则已断开连接。然后发布一条消息,客户端使用 cleansession = false 再次连接。(当订阅在 QOS1/2 时)
这可能会有所帮助:http : //www.thingsprime.com/?p=2897