Python 3.4多处理队列比Pipe更快,意外

Rui*_*lho 21 python sockets windows python-3.x python-internals

我正在做一个从udp套接字接收样本的音频播放器,一切正常.但是当我实现一个丢失隐藏算法时,播放器未能以例外速率继续产生静音(每个10ms发送一个多个160字节的列表).

当用pyaudio播放音频时,使用阻塞调用写入来播放一些样本,我注意到它在样本持续时间内平均被阻止.所以我创建了一个新的专用流程来播放样本.

主进程处理音频输出流,并使用multiprocessing.Pipe将结果发送到该进程.我决定使用multiprocessing.Pipe,因为它应该比其他方式更快.

不幸的是,当我在虚拟机上运行该程序时,比特率是我在快速PC上获得的一半,它没有达到目标比特率.

经过一些测试,我得出结论,造成延迟的原因是Pipe的功能send.

我做了一个简单的基准测试脚本(见下文),以了解传输到流程的各种方法之间的差异.该脚本不断发送[b'\x00'*160]5秒,并计算总共发送字节对象的字节数.我测试了以下发送方法:"不发送",多处理.管道,多处理.Queue,multiprocessing.Manager,multiprocessing.Listener/Client,最后是socket.socket:

我的"快速"PC运行窗口7 x64的结果:

test_empty     :     1516076640
test_pipe      :       58155840
test_queue     :      233946880
test_manager   :        2853440
test_socket    :       55696160
test_named_pipe:       58363040
Run Code Online (Sandbox Code Playgroud)

运行Windows 7 x64的VirtualBox VM guest虚拟机的结果,运行Windows 7 x64的主机:

test_empty     :     1462706080
test_pipe      :       32444160
test_queue     :      204845600
test_manager   :         882560
test_socket    :       20549280
test_named_pipe:       35387840  
Run Code Online (Sandbox Code Playgroud)

使用的脚本:

from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time

FS = "{:<15}:{:>15}"


def test_empty():
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]

        sent += len(data)
        if time.time()-s >= 5:
            break
    print(FS.format("test_empty", sent))


def pipe_void(pipe_in):
    while True:
        msg = pipe_in.recv()
        if msg == []:
            break


def test_pipe():
    pipe_out, pipe_in = Pipe()
    p = Process(target=pipe_void, args=(pipe_in,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        pipe_out.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    pipe_out.send([])
    p.join()
    print(FS.format("test_pipe", sent))


def queue_void(q):
    while True:
        msg = q.get()
        if msg == []:
            break


def test_queue():
    q = Queue()
    p = Process(target=queue_void, args=(q,))
    p.start()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        q.put(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    q.put([])
    p.join()

    print(FS.format("test_queue", sent))


def manager_void(l, lock):
    msg = None
    while True:
        with lock:
            if len(l) > 0:
                msg = l.pop(0)
        if msg == []:
            break


def test_manager():
    with Manager() as manager:
        l = manager.list()
        lock = manager.Lock()
        p = Process(target=manager_void, args=(l, lock))
        p.start()
        s = time.time()
        sent = 0
        while True:
            data = b'\x00'*160
            lst = [data]
            with lock:
                l.append(lst)
            sent += len(data)
            if time.time()-s >= 5:
                break
        with lock:
            l.append([])
        p.join()

        print(FS.format("test_manager", sent))


def socket_void():
    addr = ('127.0.0.1', 20000)
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_socket():
    addr = ('127.0.0.1', 20000)
    listener = Listener(addr, "AF_INET")
    p = Process(target=socket_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_socket", sent))


def named_pipe_void():
    addr = '\\\\.\\pipe\\Test'
    conn = Client(addr)
    while True:
        msg = conn.recv()
        if msg == []:
            break


def test_named_pipe():
    addr = '\\\\.\\pipe\\Test'
    listener = Listener(addr, "AF_PIPE")
    p = Process(target=named_pipe_void)
    p.start()
    conn = listener.accept()
    s = time.time()
    sent = 0
    while True:
        data = b'\x00'*160
        lst = [data]
        conn.send(lst)
        sent += len(data)
        if time.time()-s >= 5:
            break
    conn.send([])
    p.join()

    print(FS.format("test_named_pipe", sent))


if __name__ == "__main__":
    test_empty()
    test_pipe()
    test_queue()
    test_manager()
    test_socket()
    test_named_pipe()
Run Code Online (Sandbox Code Playgroud)

  • 如果Queue使用Pipe在这种情况下如何比Pipe更快?这与Python多处理 - 管道与队列的问题相矛盾
  • 我怎样才能保持从进程到另一个的恒定比特率流,同时具有低发送延迟?

更新1

在我的程序中,在尝试使用Queues而不是Pipes之后.我得到了巨大的推动.

在我的电脑上,使用管道我得到了+ - 16000 B/s,使用队列我得到了-7.5百万B/s.在虚拟机上,我从+ -13000 B/s到650万B/s.这是使用Pipe的队列instread大约500倍的字节.

当然我不会每秒播放数百万字节,我只会播放正常的声音速率.(在我的情况下,16000 B/s,与上面的值一致).
但问题是,我可以将速率限制为我想要的速度,同时还有时间完成其他计算(例如从套接字接收,应用声音算法等)

Mik*_*ord 3

我不能肯定地说,但我认为您正在处理的问题是同步与异步 I/O。我的猜测是,管道以某种方式结束同步,而队列以异步结束。为什么一个人默认一种方式,另一个人默认另一种方式,这个问题和答案可能会更好地回答:

python Pipes 的同步/异步行为