识别多线程MQTT发布者中的瓶颈

Rad*_*ioo 2 java multithreading executorservice runnable mqtt

我目前正在与Eclipse Paho一起开发MQTT客户端服务,以使用更大的软件,并且遇到性能问题。我收到了很多要发布给代理的事件,并且正在使用GSON对这些事件进行序列化。我已经对序列化和发布进行了多线程处理。根据基本基准,序列化和发布最多需要1毫秒。我使用的ExecutorService的线程池大小为10(目前)。

我的代码当前每秒向ExecutorService提交大约50个Runnable,但是我的代理每秒仅报告5-10条消息。我之前已经对MQTT设置进行了基准测试,并设法以非多线程方式每秒发送约9000条以上的MQTT消息。

线程池是否有这么多的开销,我只能从中得到这么少的发布?

public class MqttService implements IMessagingService{
    protected int PORT = 1883;
    protected String HOST = "localhost";
    protected final String SERVICENAME = "MQTT";
    protected static final String COMMANDTOPIC = "commands";
    protected static final String REMINDSPREFIX = "Reminds/";
    protected static final String VIOLATIONTOPIC = "violations/";
    protected static final String WILDCARDTOPIC = "Reminds/#";
    protected static final String TCPPREFIX = "tcp://";
    protected static final String SSLPREFIX = "ssl://";

    private MqttClient client;
    private MqttConnectOptions optionsPublisher = new MqttConnectOptions();

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    public MqttService() {
        this("localhost", 1883);
    }

    public MqttService(String host, int port) {
        this.HOST = host;
        this.PORT = port;

    }

    @Override
    public void setPort(int port) {
        this.PORT = port;
    }

    @Override
    public void setHost(String host) {
        this.HOST = host;
    }

    @Override
    public void sendMessage(AbstractMessage message) {
        pool.submit(new SerializeJob(client,message));
    }

    @Override
    public void connect() {
        try {
            client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
            optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            client.connect(optionsPublisher);
            client.setCallback(new MessageCallback());
            client.subscribe(WILDCARDTOPIC, 0);
        } catch (MqttException e1) {
            e1.printStackTrace();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

以下代码是ExecutorService执行的Runnable。但是,这本身不应该是一个问题,因为它只需要1-2毫秒即可完成。

class SerializeJob implements Runnable {
    private AbstractMessage message;
    private MqttClient client;

    public SerializeJob(MqttClient client, AbstractMessage m) {
        this.client = client;
        this.message = m;
    }

    @Override
    public void run() {
        String serializedMessage = MessageSerializer.serializeMessage(message);
        MqttMessage wireMessage = new MqttMessage();
        wireMessage.setQos(message.getQos());
        wireMessage.setPayload(serializedMessage.getBytes());
        if (client.isConnected()) {
            StringBuilder topic = new StringBuilder();
            topic.append(MqttService.REMINDSPREFIX);
            topic.append(MqttService.VIOLATIONTOPIC);
            try {
                client.publish(topic.toString(), wireMessage);
            } catch (MqttPersistenceException e) {
                e.printStackTrace();
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

我不太确定是什么使我退缩。MQTT本身似乎允许许多事件,这些事件也可能具有很大的负载,并且网络也不可能成为问题,因为我当前将代理本地托管在与客户端相同的机器上。

编辑并进行进一步测试:

现在,我已经对自己的安装程序进行了综合基准测试,该安装程序由本地托管的HiveMQ和Mosquitto经纪人组成,它们“自然”地在计算机上运行。使用Paho库,我已经按1000个批次发送了越来越多的消息。对于每个批次,我计算了从第一个消息到最后一个消息的消息吞吐量。这种情况下没有使用任何多线程。为此,我得出了以下性能图表:

具有2个不同代理的不同大小的MQTT消息的吞吐量

同时运行客户端和代理的计算机是具有i7 6700和32 GB RAM的台式机。代理可以访问其VM的所有内核和8 GB内存。

为了进行基准测试,我使用了以下代码:

import java.util.Random;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

public class MqttBenchmarker {
    protected static int PORT = 1883;
    protected static String HOST = "localhost";
    protected final String SERVICENAME = "MQTT";
    protected static final String COMMANDTOPIC = "commands";
    protected static final String REMINDSPREFIX = "Reminds/";
    protected static final String VIOLATIONTOPIC = "violations/";
    protected static final String WILDCARDTOPIC = "Reminds/#";
    protected static final String TCPPREFIX = "tcp://";
    protected static final String SSLPREFIX = "ssl://";

    private static MqttClient client;
    private static MqttConnectOptions optionsPublisher = new MqttConnectOptions();
    private static String IDPublisher = MqttClient.generateClientId();

    private static int messageReceived = 0;
    private static long timesent = 0;
    private static int count = 2;
    private static StringBuilder out = new StringBuilder();
    private static StringBuilder in = new StringBuilder();
    private static final int runs = 1000;
    private static boolean receivefinished = false;

    public static void main(String[] args) {
        connect();
        Thread sendThread=new Thread(new Runnable(){

            @Override
            public void run() {
                Random rd = new Random();
                for (int i = 2; i < 1000000; i += i) {
                    byte[] arr = new byte[i];
                    // System.out.println("Starting test run for byte Array of size:
                    // "+arr.length);
                    long startt = System.currentTimeMillis();
                    System.out.println("Test for size: " + i + " started.");
                    for (int a = 0; a <= runs; a++) {

                        rd.nextBytes(arr);
                        try {
                            client.publish(REMINDSPREFIX, arr, 1, false);
                        } catch (MqttPersistenceException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        } catch (MqttException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    try {
                        while (!receivefinished) {
                            Thread.sleep(10);
                        }
                        receivefinished = false;
                        System.out.println("Test for size: " + i + " finished.");
                        out.append("Sending Payload size: " + arr.length + " achieved "
                                + runs / ((System.currentTimeMillis() - startt) / 1000d) + " messages per second.\n");

                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                System.out.println(out.toString());
                System.out.println(in.toString());
            }

        });
        sendThread.start();

    }

    private static class MessageCallback implements MqttCallback {

        @Override
        public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
            if (messageReceived == 0) {
                timesent = System.currentTimeMillis();
            }
            messageReceived++;
            if (messageReceived >= runs) {
                receivefinished = true;
                in.append("Receiving payload size " + count + " achieved "
                        + runs / ((System.currentTimeMillis() - timesent) / 1000d) + " messages per second.\n");
                count += count;
                messageReceived = 0;
            }
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void connectionLost(Throwable arg0) {
            // TODO Auto-generated method stub

        }
    }

    public static void connect() {
        try {
            client = new MqttClient(TCPPREFIX + HOST + ":" + PORT, IDPublisher);
            optionsPublisher.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
            optionsPublisher.setAutomaticReconnect(true);
            optionsPublisher.setCleanSession(false);
            optionsPublisher.setMaxInflight(65535);
            client.connect(optionsPublisher);
            while (!client.isConnected()) {
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            client.setCallback(new MessageCallback());
            client.subscribe(WILDCARDTOPIC, 0);
        } catch (MqttException e1) {
            e1.printStackTrace();
        }
    }

}


Run Code Online (Sandbox Code Playgroud)

奇怪的是,我要从应用程序发送的序列化消息仅使用约4000字节。因此,理论吞吐量应约为每秒200条消息。这可能是由于回调函数内部的较长计算导致的问题吗?使用mosquitto经纪人,我已经取得了更好的结果,并且我将进一步测试如何将其提升性能。

感谢您的任何建议!

sau*_*ter 7

一个问题是MQTT客户端的测试设置。

您仅使用一个MQTT客户端。您正在有效测试的是具有以下公式的MQTT飞行窗口的大小:

  throughput <= inflight window-size / round-trip time
Run Code Online (Sandbox Code Playgroud)

默认情况下,HiveMQ启用了一个属性,该属性被<cluster-overload-protection>限制inflight window单个客户端的权限。

另外,paho客户端并不真正适合在多线程环境中进行高吞吐量工作。HiveMQ MQTT Client是针对高性能方案的更好实现。

通过连接20个客户端(10个发布和10个接收),我达到了每秒约6000 qos = 1 10kb消息的持续吞吐量。

在此处输入图片说明

免责声明:我是HiveMQ的软件开发人员。