在高消息速率下使用 pebbe zmq 代理时 CPU 使用率较高

Abh*_*hna 7 sockets arm message-passing go zeromq

我目前正在尝试使用 ZMQ 作为 IPC 的可能消息代理。
版本 -> ZMQv4
我正在使用pebbe ZMQ,这是一个基于 zmq 的 C 库的 go 库并执行测试。

我正在以 1500 条消息/秒和 10000 条消息/秒的消息速率对其进行速率测试。

我正在使用 XPUB-XSUB 架构,发布者和订阅者连接到代理。

我知道 go 库只是一个包装器,因此实际发送和接收消息涉及 c-go 调用。

我正在ARM架构的设备上进行实验。我看到在代理本身上以 1500 条消息/秒的速率传递消息时,CPU 使用率几乎为 40-50%(100% 为 ~1GB,RAM 上为 ~900MB,交换内存为 ~100MB)。

我不确定这是否可以或使用率很高。不太确定在这里用什么作为码尺。

我运行了分析测试,发现运行时 cgocall 和运行时 _ExternalCode 占用了大部分 CPU。已附上个人资料图的图像(不知道如何上传 SVG)。

在此输入图像描述

我试图了解或减少 CPU 使用率。根据这份资料,我认为我无能为力。有没有办法减少由于 C Go 调用和配置文件中显示的外部代码块而导致的 CPU 占用。
之前没有真正做过分析,所以可能不知道很多事情。

重现示例的代码:

ZMQ经纪商


import (
    "fmt"

    zmq "github.com/pebbe/zmq4"
    "github.com/pkg/profile"
)

func main() {
    defer profile.Start(profile.CPUProfile, profile.ProfilePath(".")).Stop()
    fmt.Println("Setting up XSUB socket")
    subscriberSocket, err := zmq.NewSocket(zmq.XSUB)
    if err != nil {
        fmt.Println("Error when creating XSUB socket -> ", err)
    }
    defer subscriberSocket.Close()

    err = subscriberSocket.Bind("tcp://127.0.0.1:8101")
    if err != nil {
        fmt.Println("Error when binding XSUB socket -> ", err)
    } else {
        fmt.Println("Succesfully accepting incoming connections on XSUB socket")
    }

    fmt.Println("Setting up XPUB socket")

    publisherSocket, err := zmq.NewSocket(zmq.XPUB)
    if err != nil {
        fmt.Println("Error when creating XPUB socket -> ", err)
    }
    defer publisherSocket.Close()

    err = publisherSocket.Bind("tcp://127.0.0.1:8100")
    if err != nil {
        fmt.Println("Error when binding XPUB socket -> ", err)
    } else {
        fmt.Println("Succesfully accepting incoming connections on XPUB socket")
    }

    err = zmq.Proxy(publisherSocket, subscriberSocket, nil)
    if err != nil {
        fmt.Println("Failed to start the XPUB XSUB broker -> ", err)
    }
}
Run Code Online (Sandbox Code Playgroud)

出版商


import (
    "time"

    zmq "github.com/pebbe/zmq4"

    "fmt"
)

func main() {
    publisher, err := zmq.NewSocket(zmq.PUB)
    if err != nil {
        fmt.Println("error when connecting to a pub socket -> ", err)
    }

    defer publisher.Close()

    err = publisher.Connect("tcp://127.0.0.1:8101")
    if err != nil {
        fmt.Println("error when connecting to a pub socket -> ", err)
    }

    for range time.Tick(time.Microsecond * 500) {
        sendToAll(publisher)
    }
}

func sendToAll(pub *zmq.Socket) {
    var message = "topicA test"
    _, err := pub.Send(message, zmq.DONTWAIT)
    if err != nil {
        println("error when sending message-> ", err)
    }
}
Run Code Online (Sandbox Code Playgroud)

订户


import (
    "os"
    "strconv"

    zmq "github.com/pebbe/zmq4"

    "fmt"
)

func main() {
    //  Socket to talk to server
    fmt.Println("Collecting updates from broker...")
    subscriber, err := zmq.NewSocket(zmq.SUB)
    if err != nil {
        fmt.Println("error when opening new socket to SUB -> ", err)
    }
    defer subscriber.Close()
    err = subscriber.Connect("tcp://127.0.0.1:8100")
    if err != nil {
        fmt.Println("error when connecting to XSUB port -> ", err)
    }

    err = subscriber.SetSubscribe("topicA ")
    if err != nil {
        fmt.Println("error when setting subscription filter -> ", err)
    }
    i := 0
    for {
        msg, err := subscriber.Recv(0)
        if err != nil {
            fmt.Println("error when reciveing subscription info -> ", err)
            os.Exit(1)
        }
        i += 1
        fmt.Println(msg + "\n -> count is" + strconv.Itoa(i))
    }
}

Run Code Online (Sandbox Code Playgroud)

我用于交叉编译的参数是:
不确定如何配置对性能是否重要

#!/bin/bash

ARM_PREFIX="arm-linux-androideabi-" 

TOOLCHAIN_PATH="/home/NDK/arm"

CC="${TOOLCHAIN_PATH}/bin/${ARM_PREFIX}gcc" \
CFLAGS="-march=armv7-a -mfpu=neon" \
GOOS=android \
GOARCH=arm \
GOARM=7 \
CGO_ENABLED=1 \
PKG_CONFIG_PATH="${TOOLCHAIN_PATH}/lib/pkgconfig" \
go build -o test
Run Code Online (Sandbox Code Playgroud)

可能的代理代码(我认为应该在那里)即使如此,CPU 也是相似的,所以我假设这与库实现的几乎相同。没有把握。

for {
    // this will block forever till an event occurs
    sockets, err := poller.Poll(-1)
    if err != nil {
        fmt.Println("error when establishing xpubsub poller")
    }
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case publisherSocket:
            msg, err := s.Recv(0)
            if err != nil {
                fmt.Println("error when recieving on publisherSocket")
            }
            subscriberSocket.Send(msg, zmq.DONTWAIT)
        case subscriberSocket:
            msg, err := s.Recv(0)
            if err != nil {
                fmt.Println("error when recieving on subscribersocket")
            }
            publisherSocket.Send(msg, zmq.DONTWAIT)
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

问题:

  1. 有没有办法减少 C Go 调用和外部代码带来的 CPU 占用?
  2. 我预计 CPU 利用率不会这么高,因为代理是一个阻塞轮询器,它会阻塞直到触发事件。可能我错了,因为微秒似乎是一个非常高的频率,不确定这对于 CPU 来说是否很高。
  3. 欣赏进行此类性能调整时需要考虑的指示和事项。