Rad*_*ioo 2 java multithreading executorservice runnable mqtt
我目前正在与Eclipse Paho一起开发MQTT客户端服务,以使用更大的软件,并且遇到性能问题。我收到了很多要发布给代理的事件,并且正在使用GSON对这些事件进行序列化。我已经对序列化和发布进行了多线程处理。根据基本基准,序列化和发布最多需要1毫秒。我使用的ExecutorService的线程池大小为10(目前)。
我的代码当前每秒向ExecutorService提交大约50个Runnable,但是我的代理每秒仅报告5-10条消息。我之前已经对MQTT设置进行了基准测试,并设法以非多线程方式每秒发送约9000条以上的MQTT消息。
线程池是否有这么多的开销,我只能从中得到这么少的发布?
public class MqttService implements IMessagingService{
protected int PORT = 1883;
protected String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";
private MqttClient client;
private MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private ExecutorService pool = Executors.newFixedThreadPool(10);
public MqttService() {
this("localhost", 1883);
}
public MqttService(String host, int port) {
this.HOST = host;
this.PORT = port;
}
@Override
public void setPort(int port) {
this.PORT = port;
}
@Override
public void setHost(String host) {
this.HOST = host;
}
@Override
public void sendMessage(AbstractMessage message) {
pool.submit(new SerializeJob(client,message));
}
@Override
public void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
client.connect(optionsPublisher);
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
以下代码是ExecutorService执行的Runnable。但是,这本身不应该是一个问题,因为它只需要1-2毫秒即可完成。
class SerializeJob implements Runnable {
private AbstractMessage message;
private MqttClient client;
public SerializeJob(MqttClient client, AbstractMessage m) {
this.client = client;
this.message = m;
}
@Override
public void run() {
String serializedMessage = MessageSerializer.serializeMessage(message);
MqttMessage wireMessage = new MqttMessage();
wireMessage.setQos(message.getQos());
wireMessage.setPayload(serializedMessage.getBytes());
if (client.isConnected()) {
StringBuilder topic = new StringBuilder();
topic.append(MqttService.REMINDSPREFIX);
topic.append(MqttService.VIOLATIONTOPIC);
try {
client.publish(topic.toString(), wireMessage);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我不太确定是什么使我退缩。MQTT本身似乎允许许多事件,这些事件也可能具有很大的负载,并且网络也不可能成为问题,因为我当前将代理本地托管在与客户端相同的机器上。
编辑并进行进一步测试:
现在,我已经对自己的安装程序进行了综合基准测试,该安装程序由本地托管的HiveMQ和Mosquitto经纪人组成,它们“自然”地在计算机上运行。使用Paho库,我已经按1000个批次发送了越来越多的消息。对于每个批次,我计算了从第一个消息到最后一个消息的消息吞吐量。这种情况下没有使用任何多线程。为此,我得出了以下性能图表:
同时运行客户端和代理的计算机是具有i7 6700和32 GB RAM的台式机。代理可以访问其VM的所有内核和8 GB内存。
为了进行基准测试,我使用了以下代码:
import java.util.Random;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
public class MqttBenchmarker {
protected static int PORT = 1883;
protected static String HOST = "localhost";
protected final String SERVICENAME = "MQTT";
protected static final String COMMANDTOPIC = "commands";
protected static final String REMINDSPREFIX = "Reminds/";
protected static final String VIOLATIONTOPIC = "violations/";
protected static final String WILDCARDTOPIC = "Reminds/#";
protected static final String TCPPREFIX = "tcp://";
protected static final String SSLPREFIX = "ssl://";
private static MqttClient client;
private static MqttConnectOptions optionsPublisher = new MqttConnectOptions();
private static String IDPublisher = MqttClient.generateClientId();
private static int messageReceived = 0;
private static long timesent = 0;
private static int count = 2;
private static StringBuilder out = new StringBuilder();
private static StringBuilder in = new StringBuilder();
private static final int runs = 1000;
private static boolean receivefinished = false;
public static void main(String[] args) {
connect();
Thread sendThread=new Thread(new Runnable(){
@Override
public void run() {
Random rd = new Random();
for (int i = 2; i < 1000000; i += i) {
byte[] arr = new byte[i];
// System.out.println("Starting test run for byte Array of size:
// "+arr.length);
long startt = System.currentTimeMillis();
System.out.println("Test for size: " + i + " started.");
for (int a = 0; a <= runs; a++) {
rd.nextBytes(arr);
try {
client.publish(REMINDSPREFIX, arr, 1, false);
} catch (MqttPersistenceException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
while (!receivefinished) {
Thread.sleep(10);
}
receivefinished = false;
System.out.println("Test for size: " + i + " finished.");
out.append("Sending Payload size: " + arr.length + " achieved "
+ runs / ((System.currentTimeMillis() - startt) / 1000d) + " messages per second.\n");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println(out.toString());
System.out.println(in.toString());
}
});
sendThread.start();
}
private static class MessageCallback implements MqttCallback {
@Override
public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
if (messageReceived == 0) {
timesent = System.currentTimeMillis();
}
messageReceived++;
if (messageReceived >= runs) {
receivefinished = true;
in.append("Receiving payload size " + count + " achieved "
+ runs / ((System.currentTimeMillis() - timesent) / 1000d) + " messages per second.\n");
count += count;
messageReceived = 0;
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
}
}
public static void connect() {
try {
client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
optionsPublisher.setAutomaticReconnect(true);
optionsPublisher.setCleanSession(false);
optionsPublisher.setMaxInflight(65535);
client.connect(optionsPublisher);
while (!client.isConnected()) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
client.setCallback(new MessageCallback());
client.subscribe(WILDCARDTOPIC, 0);
} catch (MqttException e1) {
e1.printStackTrace();
}
}
}
Run Code Online (Sandbox Code Playgroud)
奇怪的是,我要从应用程序发送的序列化消息仅使用约4000字节。因此,理论吞吐量应约为每秒200条消息。这可能是由于回调函数内部的较长计算导致的问题吗?使用mosquitto经纪人,我已经取得了更好的结果,并且我将进一步测试如何将其提升性能。
感谢您的任何建议!
一个问题是MQTT客户端的测试设置。
您仅使用一个MQTT客户端。您正在有效测试的是具有以下公式的MQTT飞行窗口的大小:
throughput <= inflight window-size / round-trip time
Run Code Online (Sandbox Code Playgroud)
默认情况下,HiveMQ启用了一个属性,该属性被<cluster-overload-protection>限制inflight window单个客户端的权限。
另外,paho客户端并不真正适合在多线程环境中进行高吞吐量工作。HiveMQ MQTT Client是针对高性能方案的更好实现。
通过连接20个客户端(10个发布和10个接收),我达到了每秒约6000 qos = 1 10kb消息的持续吞吐量。
免责声明:我是HiveMQ的软件开发人员。
| 归档时间: |
|
| 查看次数: |
189 次 |
| 最近记录: |