在 python 中两个进程之间传递消息的最快方法是什么?

Ale*_*x T 9 python message-queue multiprocessing

我正在寻找最快的方法(就延迟而言)在两个进程之间进行通信,告知事件已发生。

更准确地说,我在共享内存中有 numpy 数组,其中一个进程(生产者)将更新写入数组,另一个进程(消费者)读取它们。

需要使用多处理,因为我们需要克服 GIL。生产者是 CPU/IO 密集型进程,它监听数据流并进行一些数据处理。

Consumer 是一个非常轻量且大部分空闲的进程,但是当 Producer 更新数组时我们需要尽快唤醒它。

还有一件事。以最小的延迟触发消费者比传输所有消息更重要。(例如,如果生产者连续发送三个消息而没有延迟,而消费者仅收到第一个消息并丢失了后面的两个消息 - 没关系。)


为此,我尝试了多处理原语 Pipe、Queue、Event,看起来它们在延迟方面几乎相同。管道是最稳定的。

多处理.管道

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, input_pipe):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        input_pipe.send(None)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    (ip, op) = mp.Pipe()
    p = mp.Process(target=main, args=(v, ip,))
    measurements = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        op.recv()
        measurements.append(get_mcs_diff(v.value))
        i += 1

    print(np.percentile(measurements, [50, 90, 95, 99], axis=0))
    p.join()

Run Code Online (Sandbox Code Playgroud)
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [138.   206.1   238.   383.21]
Run Code Online (Sandbox Code Playgroud)

多处理队列

# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [138.   206.1   238.   383.21]
Run Code Online (Sandbox Code Playgroud)
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [187.   266.   299.05  444.06]
Run Code Online (Sandbox Code Playgroud)

多处理事件

import multiprocessing as mp
import numpy as np
import random
import time

ITER_COUNT = 1000


def get_mcs_diff(ts):
    return round((time.time() - ts) * 1e6, 0)


def main(v, q):
    for _ in range(ITER_COUNT):
        v.value = time.time()
        q.put(None)
        time.sleep((0.1 + random.random()) / 100)


if __name__ == "__main__":
    v = mp.Value('d', time.time())
    q = mp.Queue()
    p = mp.Process(target=main, args=(v, q,))
    measurments = []

    p.start()
    i = 0
    while i < ITER_COUNT:
        q.get()
        measurments.append(get_mcs_diff(v.value))
        i += 1

    print(measurments)
    print(np.percentile(measurments, [50, 90, 95, 99], axis=0))
    p.join()

Run Code Online (Sandbox Code Playgroud)
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [142.  222.1  256.05  1754.77]
Run Code Online (Sandbox Code Playgroud)

while True,忙循环

# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [187.   266.   299.05  444.06]
Run Code Online (Sandbox Code Playgroud)
# Output
# 50, 90, 95, 99 percentiles in microseconds
# > [ 33.    65.    81.   128.05]
Run Code Online (Sandbox Code Playgroud)

到目前为止,繁忙循环是最快的选择。但由于显而易见的原因,我想避免它。