Python UDP套接字发送瓶颈(慢/延迟随机)

Jan*_*Jan 9 python sockets gil

Python UDP Streamer与发送中的hickup

我目前正在开发一个python 3.4网络流媒体应用程序.我的套接字有一些疯狂的行为.(如果可能,目标3.3兼容)
定义:当我谈到Stream时,意味着UDP-Stream.

问题

虽然发送socket.send操作有时会开始花费1-3ms,因为我将在下面描述更多传输目标.我在这里找到了其他线程来讲述速度问题,但他们每秒发送200k包,但他们只发送"A".在我的情况下,每个数据包是1500字节公司.由socket添加的UDP和IP头.如果此时问题不明确,请参阅下面的解释.

有谁知道为什么这会延误?或者如何加快发送到达实时?

我的测试代码如下所示:

def _transfer(self):
    self.total_num_samps_sent = 0
    self.sequence_out = 0
    self.send_in_progress = True
    send = self.udp_socket.send
    for i in range(0, len(streams), 1):
        stream_data, stream_samps, stream_seq = self.packed_streams[i]
        # commit the samples
        start_try_send_time = monotonic()
        while not self.ready():
            if monotonic() - start_try_send_time > self.timeout > 0:
                # timeout; if timeout == 0 wait endless
                return 0
        self.sequence_out = stream_seq
        # ######################
        # Here is the bottleneck
        # ######################
        s = monotonic()
        send(stream_data)
        e = monotonic()
        if e-s > 0:
            print(str(i) + ': ' + str(e-s))
        # #####################
        # end measure monotonic
        # #####################
    self.total_num_samps_sent += stream_samps
    self.send_in_progress = False
Run Code Online (Sandbox Code Playgroud)

self.packed_streams包含一个元组列表(data_in_bytes(),number_samples_in_this_stream,sequence_out)函数self.ready()返回True,当有目标的ACK发送足够的数据包时(有空闲RAM).

特殊标记的瓶颈是更详细的描述:看得多一点

套接字创建如下:

self.target = (str(self.ip_target), port)
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.settimeout(self.socket_timeout)
try:
    self.udp_socket.bind((str(self.ip_own), 0))
except OSError as os_error:
    error = ('OS Error: {0}'.format(os_error)
             + linesep + 'IP src: ' + str(self.ip_own)
             + linesep + 'IP dst: ' + str(self.ip_usrp)
             + linesep + 'Port: {0}'.format(port))
    exit(error)
self.udp_socket.connect(self.target)
# not helps to set to non blocking
# self.udp_socket.setblocking(False) 
Run Code Online (Sandbox Code Playgroud)

sendfunction(第一个代码块)作为单独的线程运行.UDPFlowControl也产生了另一个线程.在与发送流媒体相同的套接字上运行(Streamer继承FlowControl并使用其就绪状态)

UDPFlowControl

def _worker(self):
    """
    * Receive Loop
    * - update flow control condition count
    * - put async message packets into queue
    """
    self.send_here_am_i()
    while 1:
        ready = select([self.udp_socket], [], [], self.socket_timeout)
        if ready[0]:
            try:
                data_in = self.udp_socket.recv(2048)
            except:
                # ignore timeout/error buffers
                continue
            # with suppress(Exception):  #ToDo Reenable after test is done
            bytes_in = len(data_in)
            self.data_received += bytes_in
            # extract the vrt header packet info
            vrt = VRTImplementation()
            vrt.num_packet_words32 = int(bytes_in / ctypes.sizeof(ctypes.c_uint32))
            if not vrt.unpack_header(data_in, VRTEndian.BIG_ENDIAN):
                continue
            # handle a tx async report message
            if vrt.stream_id32 == Defaults.ASYNC_SID and vrt.packet_type != PacketType.DATA:
                # fill in the async metadata
                metadata = MetadataAsync()
                metadata.load_from_vrt(vrt, data_in[vrt.num_header_words32 * 4:],
                                       self.tick_rate)
                # catch the flow control packets and react
                if metadata.event_code == EventCode.FLOW_CONTROL:
                    self.sequence_in = \
                        unpack('>I', data_in[vrt.num_header_words32 * 4 + 4:vrt.num_header_words32 * 4 + 8])[0]
                    continue
                self.async_msg_fifo.append(metadata)
            else:
                # TODO: unknown packet
                pass

def ready(self):
    """
    Check if less ack are outstanding than max allowed
    :returns bool: if device can get more data
    """
    return self.sequence_out - self.sequence_in < self.max_sequence_out
Run Code Online (Sandbox Code Playgroud)

CPROFILE

<<删除旧基准>>如果再次需要此信息,请查看历史记录!

如上所述,单调剖析是我提问的原因.如您所见,0的时间被忽略.输出如下所示:(该流包含5秒的数据(要发送的2754,8个字节流),结果大小(wireshark)各为1500字节

Send:  445.40K of    5.00M, Sending:  True @ monotonic time:   44927.0550
1227: 0.01599999999598367
1499: 0.01599999999598367
1740: 0.014999999999417923
1883: 0.01600000000325963
Send:  724.18K of    5.00M, Sending:  True @ monotonic time:   44927.3200
....
Run Code Online (Sandbox Code Playgroud)

第一个数字是延迟打包的索引.第二个数字是此延迟的差异时间单调.这里没有显示,但在我的日志中我发现时间如7582:0.030999999995401595,有时更高,为0.06 ...

以Send开头的行是将当前状态写入控制台的主线程.写完后再睡250ms.

我的问题是目前系统只运行目标速度的1/25并且已经启动了这个hickups,正如你在cProfile中看到的那样,这需要将近30秒来发送5秒的流.每1500Bytes的目标速度为68870P/s,约为98.5MByte,包含开销@ GbE => 125MByte/s限制.

这是单一目标应用程序.通常通过网络线直接连接到设备,无需任何路由器,交换机等.所以网络只属于这个应用程序和设备.

到目前为止我做了什么:

  • 正如您在代码中看到的那样,我将测试最小化,流已经在内存中准备好转移到设备,不再需要转换,只放在插槽内.
  • 测试是否选择发送套接字是否准备就绪,开始单调,在套接字内抛出数据,停止单调并查看结果.
  • 用wireshark检查网络(13774发送电话13774出现在wireshark,我数~1310 hickups)
  • 想想GIL是理由,但很难弄清楚.
  • 测试时关闭防火墙 - 没有变化
  • [编辑1]使用Boost的C++中的Testet如果socket可以以目标速度执行,这里它也有hickups但它们要短得多100-1000μs(设备中的1MB缓冲区可以处理)

在所有测试中请记住,print命令仅用于调试.一半的单调调用也用于调试目的.

<<删除旧基准>>如果再次需要此信息,请查看历史记录!

使用Python 3.4.2在Windows 7 x64上运行.@ Corei7 2630QM和8GB RAM

<<删除旧基准>>如果再次需要此信息,请查看历史记录!

编辑3

首先,因为我可以快速回答它cProfile在Thread内部运行,_worker仍然是一个未经编译的第二个线程,因为等待准备的时间很短(总和约0.05)我猜它运行得足够快._send函数是线程入口,更多的是能够cProfile这个Thread的包装器.

def _send(self):
    profile = cProfile.Profile()
    profile.enable()
    self._transfer()
    profile.disable()
    profile.print_stats()
Run Code Online (Sandbox Code Playgroud)

禁用超时并重新运行分析需要等待1或2天我正在清理代码,因为仍然在后台保持线程处于暂停状态(250ms睡眠)我认为让它们死亡并在使用时重生是不成问题的.完成后,我将重试测试.更多我认为GIL在这里是邪恶的.可能是在流控制中解压缩包的过程以及线程之间的切换,这可能需要一些时间并导致这个hickup.(如果我理解GIL正确 - 只有一个线程可以同时执行python代码,但我想知道为什么这总是命中套接字动作,而不是以更加平等的方式分割就绪和发送呼叫,如40/60-50/50 )因此,我的待办事项清单上有期货包,以实现与流程的真正多核使用.为了测试这一点,我将设置ready to permanent返回True并且FlowControl Thread不能在1st命令中启动或返回.

该程序的目标是在Linux,Windows,Mac和Unix上运行.

编辑4

首先关于线程 - 它们没有这里提到的优先级:控制python线程的调度优先级? 我相信没有办法改变它.运行的核心Python最高可达25%.调试器运行时,整体系统负载约为10%.

选择运行只是一个测试.我删除了发送例程中的选择代码并测试了是否有超时:

<<删除旧基准>>如果再次需要此信息,请查看历史记录!

线程清理旧代码的示例

在这个例子中,我杀死了所有线程,而不是让他们睡觉.主线程睡眠时间更长.没有FlowControl @ 5M

         41331 function calls in 2.935 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    2.007    2.007    2.935    2.935 SendStreamer.py:297(_transfer)
 13776    0.005    0.000    0.005    0.000 UDPFlowControl.py:52(ready)
     1    0.000    0.000    0.000    0.000 {built-in method len}
 13776    0.007    0.000    0.007    0.000 {built-in method monotonic}
     1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 13776    0.915    0.000    0.915    0.000 {method 'send' of '_socket.socket' objects}
Run Code Online (Sandbox Code Playgroud)

使用FlowControl @ 5M

这里等待设备的时间比发送时间长.

            68873 function calls in 5.245 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    4.210    4.210    5.245    5.245 SendStreamer.py:297(_transfer)
 27547    0.030    0.000    0.030    0.000 UDPFlowControl.py:52(ready)
     1    0.000    0.000    0.000    0.000 {built-in method len}
 27547    0.011    0.000    0.011    0.000 {built-in method monotonic}
     1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 13776    0.993    0.000    0.993    0.000 {method 'send' of '_socket.socket' objects}
Run Code Online (Sandbox Code Playgroud)

仍然开放:分成流程. - 仍在为进程使用重构类结构(我认为最新结束可能会添加一些新结果).在一些更详细的基准测试中,我发现第二个线程(解包VRT)几乎花费了每个hickups持续时间.通过流程,这不应该成为减速的可能原因.

我希望有所需的信息,如果我忘了一些请问!

[编辑1]添加了我已完成列表的信息

[编辑2]添加了第二测试系统(Manjaro)的cProfiles

[Edit3]添加了有关cProfile如何运行的信息.

[Edit4]更多cProfiles +有关线程的答案

[编辑5]删除了旧基准

Dim*_*nek 2

我可以在 Linux 上以非特权用户 python2 身份运行来确认这一点。

我认为你能做的不多:

# timing code:
In [16]: @contextlib.contextmanager
   ....: def timeit():
   ....:     st = time.time()
   ....:     yield
   ....:     en = time.time()
   ....:     b = int(math.log10(en - st))
   ....:     data.setdefault(b, 0)
   ....:     data[b] += 1

# Thus, timing data means:
-6: number of times send took between 0.00000011 and 0.000001s
-4: 0.0000011 ~ 0.00001
-4: 0.000011 ~ 0.0001
-3: 0.00011 ~ 0.001 (up to millisecond)
-2: 0.0011 ~ 0.01 (1..10ms)

# Regular blocking socket
{-6: 2807, -5: 992126, -4: 5049, -3: 18}
# Non-blocking socket
{-6: 3242, -5: 991767, -4: 4970, -3: 20, -2: 1}
# socket with timeout=0
{-6: 2249, -5: 992994, -4: 4749, -3: 8}
# socket with timeout=1
{-5: 994259, -4: 5727, -3: 8, -2: 6}
Run Code Online (Sandbox Code Playgroud)

看起来这个分布的尾部是指数分布的。

我还设置了更大的发送缓冲区,并偶尔添加time.sleep给内核时间来发送排队的数据包,但这并没有帮助。这是有道理的,因为非阻塞偶尔也会导致发送缓慢。

我还尝试根据http://www.pycopia.net/_modules/pycopia/socket.html 函数明确等待发送队列为空outq,并且这也没有改变分布。