Rui*_*lho 21 python sockets windows python-3.x python-internals
我正在做一个从udp套接字接收样本的音频播放器,一切正常.但是当我实现一个丢失隐藏算法时,播放器未能以例外速率继续产生静音(每个10ms发送一个多个160字节的列表).
当用pyaudio播放音频时,使用阻塞调用写入来播放一些样本,我注意到它在样本持续时间内平均被阻止.所以我创建了一个新的专用流程来播放样本.
主进程处理音频输出流,并使用multiprocessing.Pipe将结果发送到该进程.我决定使用multiprocessing.Pipe,因为它应该比其他方式更快.
不幸的是,当我在虚拟机上运行该程序时,比特率是我在快速PC上获得的一半,它没有达到目标比特率.
经过一些测试,我得出结论,造成延迟的原因是Pipe的功能send.
我做了一个简单的基准测试脚本(见下文),以了解传输到流程的各种方法之间的差异.该脚本不断发送[b'\x00'*160]5秒,并计算总共发送字节对象的字节数.我测试了以下发送方法:"不发送",多处理.管道,多处理.Queue,multiprocessing.Manager,multiprocessing.Listener/Client,最后是socket.socket:
我的"快速"PC运行窗口7 x64的结果:
test_empty : 1516076640
test_pipe : 58155840
test_queue : 233946880
test_manager : 2853440
test_socket : 55696160
test_named_pipe: 58363040
Run Code Online (Sandbox Code Playgroud)
运行Windows 7 x64的VirtualBox VM guest虚拟机的结果,运行Windows 7 x64的主机:
test_empty : 1462706080
test_pipe : 32444160
test_queue : 204845600
test_manager : 882560
test_socket : 20549280
test_named_pipe: 35387840
Run Code Online (Sandbox Code Playgroud)
from multiprocessing import Process, Pipe, Queue, Manager
from multiprocessing.connection import Client, Listener
import time
FS = "{:<15}:{:>15}"
def test_empty():
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
sent += len(data)
if time.time()-s >= 5:
break
print(FS.format("test_empty", sent))
def pipe_void(pipe_in):
while True:
msg = pipe_in.recv()
if msg == []:
break
def test_pipe():
pipe_out, pipe_in = Pipe()
p = Process(target=pipe_void, args=(pipe_in,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
pipe_out.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
pipe_out.send([])
p.join()
print(FS.format("test_pipe", sent))
def queue_void(q):
while True:
msg = q.get()
if msg == []:
break
def test_queue():
q = Queue()
p = Process(target=queue_void, args=(q,))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
q.put(lst)
sent += len(data)
if time.time()-s >= 5:
break
q.put([])
p.join()
print(FS.format("test_queue", sent))
def manager_void(l, lock):
msg = None
while True:
with lock:
if len(l) > 0:
msg = l.pop(0)
if msg == []:
break
def test_manager():
with Manager() as manager:
l = manager.list()
lock = manager.Lock()
p = Process(target=manager_void, args=(l, lock))
p.start()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
with lock:
l.append(lst)
sent += len(data)
if time.time()-s >= 5:
break
with lock:
l.append([])
p.join()
print(FS.format("test_manager", sent))
def socket_void():
addr = ('127.0.0.1', 20000)
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_socket():
addr = ('127.0.0.1', 20000)
listener = Listener(addr, "AF_INET")
p = Process(target=socket_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_socket", sent))
def named_pipe_void():
addr = '\\\\.\\pipe\\Test'
conn = Client(addr)
while True:
msg = conn.recv()
if msg == []:
break
def test_named_pipe():
addr = '\\\\.\\pipe\\Test'
listener = Listener(addr, "AF_PIPE")
p = Process(target=named_pipe_void)
p.start()
conn = listener.accept()
s = time.time()
sent = 0
while True:
data = b'\x00'*160
lst = [data]
conn.send(lst)
sent += len(data)
if time.time()-s >= 5:
break
conn.send([])
p.join()
print(FS.format("test_named_pipe", sent))
if __name__ == "__main__":
test_empty()
test_pipe()
test_queue()
test_manager()
test_socket()
test_named_pipe()
Run Code Online (Sandbox Code Playgroud)
在我的程序中,在尝试使用Queues而不是Pipes之后.我得到了巨大的推动.
在我的电脑上,使用管道我得到了+ - 16000 B/s,使用队列我得到了-7.5百万B/s.在虚拟机上,我从+ -13000 B/s到650万B/s.这是使用Pipe的队列instread大约500倍的字节.
当然我不会每秒播放数百万字节,我只会播放正常的声音速率.(在我的情况下,16000 B/s,与上面的值一致).
但问题是,我可以将速率限制为我想要的速度,同时还有时间完成其他计算(例如从套接字接收,应用声音算法等)
我不能肯定地说,但我认为您正在处理的问题是同步与异步 I/O。我的猜测是,管道以某种方式结束同步,而队列以异步结束。为什么一个人默认一种方式,另一个人默认另一种方式,这个问题和答案可能会更好地回答:
| 归档时间: |
|
| 查看次数: |
1876 次 |
| 最近记录: |