Jan*_*Jan 9 python sockets gil
我目前正在开发一个python 3.4网络流媒体应用程序.我的套接字有一些疯狂的行为.(如果可能,目标3.3兼容)
定义:当我谈到Stream时,意味着UDP-Stream.
问题
虽然发送socket.send操作有时会开始花费1-3ms,因为我将在下面描述更多传输目标.我在这里找到了其他线程来讲述速度问题,但他们每秒发送200k包,但他们只发送"A".在我的情况下,每个数据包是1500字节公司.由socket添加的UDP和IP头.如果此时问题不明确,请参阅下面的解释.
题
有谁知道为什么这会延误?或者如何加快发送到达实时?
def _transfer(self):
self.total_num_samps_sent = 0
self.sequence_out = 0
self.send_in_progress = True
send = self.udp_socket.send
for i in range(0, len(streams), 1):
stream_data, stream_samps, stream_seq = self.packed_streams[i]
# commit the samples
start_try_send_time = monotonic()
while not self.ready():
if monotonic() - start_try_send_time > self.timeout > 0:
# timeout; if timeout == 0 wait endless
return 0
self.sequence_out = stream_seq
# ######################
# Here is the bottleneck
# ######################
s = monotonic()
send(stream_data)
e = monotonic()
if e-s > 0:
print(str(i) + ': ' + str(e-s))
# #####################
# end measure monotonic
# #####################
self.total_num_samps_sent += stream_samps
self.send_in_progress = False
Run Code Online (Sandbox Code Playgroud)
self.packed_streams包含一个元组列表(data_in_bytes(),number_samples_in_this_stream,sequence_out)函数self.ready()返回True,当有目标的ACK发送足够的数据包时(有空闲RAM).
特殊标记的瓶颈是更详细的描述:看得多一点
self.target = (str(self.ip_target), port)
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.settimeout(self.socket_timeout)
try:
self.udp_socket.bind((str(self.ip_own), 0))
except OSError as os_error:
error = ('OS Error: {0}'.format(os_error)
+ linesep + 'IP src: ' + str(self.ip_own)
+ linesep + 'IP dst: ' + str(self.ip_usrp)
+ linesep + 'Port: {0}'.format(port))
exit(error)
self.udp_socket.connect(self.target)
# not helps to set to non blocking
# self.udp_socket.setblocking(False)
Run Code Online (Sandbox Code Playgroud)
sendfunction(第一个代码块)作为单独的线程运行.UDPFlowControl也产生了另一个线程.在与发送流媒体相同的套接字上运行(Streamer继承FlowControl并使用其就绪状态)
def _worker(self):
"""
* Receive Loop
* - update flow control condition count
* - put async message packets into queue
"""
self.send_here_am_i()
while 1:
ready = select([self.udp_socket], [], [], self.socket_timeout)
if ready[0]:
try:
data_in = self.udp_socket.recv(2048)
except:
# ignore timeout/error buffers
continue
# with suppress(Exception): #ToDo Reenable after test is done
bytes_in = len(data_in)
self.data_received += bytes_in
# extract the vrt header packet info
vrt = VRTImplementation()
vrt.num_packet_words32 = int(bytes_in / ctypes.sizeof(ctypes.c_uint32))
if not vrt.unpack_header(data_in, VRTEndian.BIG_ENDIAN):
continue
# handle a tx async report message
if vrt.stream_id32 == Defaults.ASYNC_SID and vrt.packet_type != PacketType.DATA:
# fill in the async metadata
metadata = MetadataAsync()
metadata.load_from_vrt(vrt, data_in[vrt.num_header_words32 * 4:],
self.tick_rate)
# catch the flow control packets and react
if metadata.event_code == EventCode.FLOW_CONTROL:
self.sequence_in = \
unpack('>I', data_in[vrt.num_header_words32 * 4 + 4:vrt.num_header_words32 * 4 + 8])[0]
continue
self.async_msg_fifo.append(metadata)
else:
# TODO: unknown packet
pass
def ready(self):
"""
Check if less ack are outstanding than max allowed
:returns bool: if device can get more data
"""
return self.sequence_out - self.sequence_in < self.max_sequence_out
Run Code Online (Sandbox Code Playgroud)
<<删除旧基准>>如果再次需要此信息,请查看历史记录!
如上所述,单调剖析是我提问的原因.如您所见,0的时间被忽略.输出如下所示:(该流包含5秒的数据(要发送的2754,8个字节流),结果大小(wireshark)各为1500字节
Send: 445.40K of 5.00M, Sending: True @ monotonic time: 44927.0550
1227: 0.01599999999598367
1499: 0.01599999999598367
1740: 0.014999999999417923
1883: 0.01600000000325963
Send: 724.18K of 5.00M, Sending: True @ monotonic time: 44927.3200
....
Run Code Online (Sandbox Code Playgroud)
第一个数字是延迟打包的索引.第二个数字是此延迟的差异时间单调.这里没有显示,但在我的日志中我发现时间如7582:0.030999999995401595,有时更高,为0.06 ...
以Send开头的行是将当前状态写入控制台的主线程.写完后再睡250ms.
我的问题是目前系统只运行目标速度的1/25并且已经启动了这个hickups,正如你在cProfile中看到的那样,这需要将近30秒来发送5秒的流.每1500Bytes的目标速度为68870P/s,约为98.5MByte,包含开销@ GbE => 125MByte/s限制.
这是单一目标应用程序.通常通过网络线直接连接到设备,无需任何路由器,交换机等.所以网络只属于这个应用程序和设备.
到目前为止我做了什么:
在所有测试中请记住,print命令仅用于调试.一半的单调调用也用于调试目的.
<<删除旧基准>>如果再次需要此信息,请查看历史记录!
使用Python 3.4.2在Windows 7 x64上运行.@ Corei7 2630QM和8GB RAM
<<删除旧基准>>如果再次需要此信息,请查看历史记录!
首先,因为我可以快速回答它cProfile在Thread内部运行,_worker仍然是一个未经编译的第二个线程,因为等待准备的时间很短(总和约0.05)我猜它运行得足够快._send函数是线程入口,更多的是能够cProfile这个Thread的包装器.
def _send(self):
profile = cProfile.Profile()
profile.enable()
self._transfer()
profile.disable()
profile.print_stats()
Run Code Online (Sandbox Code Playgroud)
禁用超时并重新运行分析需要等待1或2天我正在清理代码,因为仍然在后台保持线程处于暂停状态(250ms睡眠)我认为让它们死亡并在使用时重生是不成问题的.完成后,我将重试测试.更多我认为GIL在这里是邪恶的.可能是在流控制中解压缩包的过程以及线程之间的切换,这可能需要一些时间并导致这个hickup.(如果我理解GIL正确 - 只有一个线程可以同时执行python代码,但我想知道为什么这总是命中套接字动作,而不是以更加平等的方式分割就绪和发送呼叫,如40/60-50/50 )因此,我的待办事项清单上有期货包,以实现与流程的真正多核使用.为了测试这一点,我将设置ready to permanent返回True并且FlowControl Thread不能在1st命令中启动或返回.
该程序的目标是在Linux,Windows,Mac和Unix上运行.
首先关于线程 - 它们没有这里提到的优先级:控制python线程的调度优先级? 我相信没有办法改变它.运行的核心Python最高可达25%.调试器运行时,整体系统负载约为10%.
选择运行只是一个测试.我删除了发送例程中的选择代码并测试了是否有超时:
<<删除旧基准>>如果再次需要此信息,请查看历史记录!
在这个例子中,我杀死了所有线程,而不是让他们睡觉.主线程睡眠时间更长.没有FlowControl @ 5M
41331 function calls in 2.935 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 2.007 2.007 2.935 2.935 SendStreamer.py:297(_transfer)
13776 0.005 0.000 0.005 0.000 UDPFlowControl.py:52(ready)
1 0.000 0.000 0.000 0.000 {built-in method len}
13776 0.007 0.000 0.007 0.000 {built-in method monotonic}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
13776 0.915 0.000 0.915 0.000 {method 'send' of '_socket.socket' objects}
Run Code Online (Sandbox Code Playgroud)
这里等待设备的时间比发送时间长.
68873 function calls in 5.245 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 4.210 4.210 5.245 5.245 SendStreamer.py:297(_transfer)
27547 0.030 0.000 0.030 0.000 UDPFlowControl.py:52(ready)
1 0.000 0.000 0.000 0.000 {built-in method len}
27547 0.011 0.000 0.011 0.000 {built-in method monotonic}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
13776 0.993 0.000 0.993 0.000 {method 'send' of '_socket.socket' objects}
Run Code Online (Sandbox Code Playgroud)
仍然开放:分成流程. - 仍在为进程使用重构类结构(我认为最新结束可能会添加一些新结果).在一些更详细的基准测试中,我发现第二个线程(解包VRT)几乎花费了每个hickups持续时间.通过流程,这不应该成为减速的可能原因.
我希望有所需的信息,如果我忘了一些请问!
[编辑1]添加了我已完成列表的信息
[编辑2]添加了第二测试系统(Manjaro)的cProfiles
[Edit3]添加了有关cProfile如何运行的信息.
[Edit4]更多cProfiles +有关线程的答案
[编辑5]删除了旧基准
我可以在 Linux 上以非特权用户 python2 身份运行来确认这一点。
我认为你能做的不多:
# timing code:
In [16]: @contextlib.contextmanager
....: def timeit():
....: st = time.time()
....: yield
....: en = time.time()
....: b = int(math.log10(en - st))
....: data.setdefault(b, 0)
....: data[b] += 1
# Thus, timing data means:
-6: number of times send took between 0.00000011 and 0.000001s
-4: 0.0000011 ~ 0.00001
-4: 0.000011 ~ 0.0001
-3: 0.00011 ~ 0.001 (up to millisecond)
-2: 0.0011 ~ 0.01 (1..10ms)
# Regular blocking socket
{-6: 2807, -5: 992126, -4: 5049, -3: 18}
# Non-blocking socket
{-6: 3242, -5: 991767, -4: 4970, -3: 20, -2: 1}
# socket with timeout=0
{-6: 2249, -5: 992994, -4: 4749, -3: 8}
# socket with timeout=1
{-5: 994259, -4: 5727, -3: 8, -2: 6}
Run Code Online (Sandbox Code Playgroud)
看起来这个分布的尾部是指数分布的。
我还设置了更大的发送缓冲区,并偶尔添加time.sleep给内核时间来发送排队的数据包,但这并没有帮助。这是有道理的,因为非阻塞偶尔也会导致发送缓慢。
我还尝试根据http://www.pycopia.net/_modules/pycopia/socket.html 函数明确等待发送队列为空outq,并且这也没有改变分布。