如何计算子进程发出的每个UDP数据包?

exh*_*uma 9 python sockets linux networking

我有一个Python应用程序,它协调对底层进程的调用.使用这些进程调用subprocess.check_output它们,并对远程网络设备进行SNMP调用.

对于性能监视,我想计算发送的SNMP数据包的数量.我主要对数据包的数量感兴趣.请求/响应的数据包大小也很有趣,但不太重要.目的是了解此应用程序导致的防火墙压力.

因此,为了论证,让我们假设以下愚蠢的应用程序:

from subprocess import check_output
output = check_output(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
print(output)
Run Code Online (Sandbox Code Playgroud)

这将导致在端口161上发送新的UDP数据包.

在这种情况下我怎么算他们?

这是带有存根函数的另一个版本(也可以是上下文管理器):

from subprocess import check_call


def start_monitoring():
    pass


def stop_monitoring():
    return 0


start_monitoring()
check_call(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
check_call(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
check_call(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
num_connections = stop_monitoring()
assert num_connections == 3
Run Code Online (Sandbox Code Playgroud)

在这个人为的例子中,当我手动执行SNMP调用时,显然会有3个调用.但在实际示例中,SNMP调用的数量不等于对子进程的调用.有时一个或多个GET被执行,有时它是简单的步行(即,许多连续的UDP请求),有时它是批量遍历(未知数量的请求).

所以我不能简单地监视调用应用程序的次数.我真的要监视UDP请求.

这样的事情甚至可能吗?如果有,怎么样?

知道这可以作为非root用户在Linux上运行,这很可能很重要.但是所有子进程都以同一用户身份运行.

exh*_*uma 2

根据这个答案这个 github 存储库通过另一个答案,我想出了以下 UDP 代理/中继的实现:

#!/usr/bin/env python

from collections import namedtuple
from contextlib import contextmanager
from random import randint
from time import sleep
import logging
import socket
import threading

import snmp


MSG_DONTWAIT = 0x40  # from socket.h
LOCK = threading.Lock()

MSG_TYPE_REQUEST = 1
MSG_TYPE_RESPONSE = 2
Statistics = namedtuple('Statistics', 'msgtype packet_size')


def visible_octets(data: bytes) -> str:
    """
    Returns a geek-friendly (hexdump)  output of a bytes object.

    Developer note:
        This is not super performant. But it's not something that's supposed to
        be run during normal operations (mostly for testing and debugging).  So
        performance should not be an issue, and this is less obfuscated than
        existing solutions.

    Example::

        >>> from os import urandom
        >>> print(visible_octets(urandom(40)))
        99 1f 56 a9 25 50 f7 9b  95 7e ff 80 16 14 88 c5   ..V.%P...~......
        f3 b4 83 d4 89 b2 34 b4  71 4e 5a 69 aa 9f 1d f8   ......4.qNZi....
        1d 33 f9 8e f1 b9 12 e9                            .3......

    """
    from binascii import hexlify, unhexlify

    hexed = hexlify(data).decode('ascii')
    tuples = [''.join((a, b)) for a, b in zip(hexed[::2], hexed[1::2])]
    line = []
    output = []
    ascii_column = []
    for idx, octet in enumerate(tuples):
        line.append(octet)
        # only use printable characters in ascii output
        ascii_column.append(octet if 32 <= int(octet, 16) < 127 else '2e')
        if (idx+1) % 8 == 0:
            line.append('')
        if (idx+1) % 8 == 0 and (idx+1) % 16 == 0:
            raw_ascii = unhexlify(''.join(ascii_column))
            raw_ascii = raw_ascii.replace(b'\\n z', b'.')
            ascii_column = []
            output.append('%-50s %s' % (' '.join(line),
                                        raw_ascii.decode('ascii')))
            line = []
    raw_ascii = unhexlify(''.join(ascii_column))
    raw_ascii = raw_ascii.replace(b'\\n z', b'.')
    output.append('%-50s %s' % (' '.join(line), raw_ascii.decode('ascii')))
    line = []
    return '\n'.join(output)


@contextmanager
def UdpProxy(remote_host, remote_port, queue=None):
    thread = UdpProxyThread(remote_host, remote_port, stats_queue=queue)
    thread.prime()
    thread.start()
    yield thread.local_port
    thread.stop()
    thread.join()


class UdpProxyThread(threading.Thread):

    def __init__(self, remote_host, remote_port, stats_queue=None):
        super().__init__()
        self.local_port = randint(60000, 65535)
        self.remote_host = remote_host
        self.remote_port = remote_port
        self.daemon = True
        self.log = logging.getLogger('%s.%s' % (
            __name__, self.__class__.__name__))
        self.running = True
        self._socket = None
        self.stats_queue = stats_queue

    def fail(self, reason):
        self.log.debug('UDP Proxy Failure: %s', reason)
        self.running = False

    def prime(self):
        """
        We need to set up a socket on a FREE port for this thread. Retry until
        we find a free port.

        This is used as a separate method to ensure proper locking and to ensure
        that each thread has it's own port

        The port can be retrieved by accessing the *local_port* member of the
        thread.
        """
        with LOCK:
            while True:
                try:
                    self._socket = socket.socket(socket.AF_INET,
                                                 socket.SOCK_DGRAM)
                    self._socket.bind(('', self.local_port))
                    break
                except OSError as exc:
                    self.log.warning('Port %d already in use. Shuffling...',
                                     self.local_port)
                    if exc.errno == 98:  # Address already in use
                        self.local_port = randint(60000, 65535)
                        self._socket.close()
                    else:
                        raise

    @property
    def name(self):
        return 'UDP Proxy Thread {} -> {}:{}'.format(self.local_port,
                                                     self.remote_host,
                                                     self.remote_port)

    def start(self):
        if not self._socket:
            raise ValueError('Socket was not set. Call prime() first!')
        super().start()

    def run(self):
        try:
            known_client = None
            known_server = (self.remote_host, self.remote_port)
            self.log.info('UDP Proxy set up: %s -> %s:%s',
                          self.local_port, self.remote_host, self.remote_port)
            while self.running:
                try:
                    data, addr = self._socket.recvfrom(32768, MSG_DONTWAIT)
                    self.log.debug('Packet received via %s\n%s', addr,
                                   visible_octets(data))
                except BlockingIOError:
                    sleep(0.1)  # Give self.stop() a chance to trigger
                else:
                    if known_client is None:
                        known_client = addr
                    if addr == known_client:
                        self.log.debug('Proxying request packet to %s\n%s',
                                       known_server, visible_octets(data))
                        self._socket.sendto(data, known_server)
                        if self.stats_queue:
                            self.stats_queue.put(Statistics(
                                MSG_TYPE_REQUEST, len(data)))
                    else:
                        self.log.debug('Proxying response packet to %s\n%s',
                                       known_client, visible_octets(data))
                        self._socket.sendto(data, known_client)
                        if self.stats_queue:
                            self.stats_queue.put(Statistics(
                                MSG_TYPE_RESPONSE, len(data)))
            self.log.info('%s stopped!', self.name)
        finally:
            self._socket.close()

    def stop(self):
        self.log.debug('Stopping %s...', self.name)
        self.running = False


if __name__ == '__main__':
    logging.basicConfig(level=0)
    from queue import Queue
    stat_queue = Queue()

    with UdpProxy('192.168.1.1', 161, stat_queue) as proxied_port:
        print(snmp.get('1.3.6.1.2.1.1.2.0', '127.0.0.1:%s' % proxied_port,
                       'testing'))
    with UdpProxy('192.168.1.1', 161, stat_queue) as proxied_port:
        print(snmp.get('1.3.6.1.2.1.1.2.0', '127.0.0.1:%s' % proxied_port,
                       'testing'))

    while not stat_queue.empty():
        stat_item = stat_queue.get()
        print(stat_item)
        stat_queue.task_done()
Run Code Online (Sandbox Code Playgroud)

正如本节中所见__main__,它可以简单地使用如下:

    from queue import Queue
    stat_queue = Queue()

    with UdpProxy('192.168.1.1', 161, stat_queue) as proxied_port:
        print(snmp.get('1.3.6.1.2.1.1.2.0', '127.0.0.1:%s' % proxied_port,
                       'testing'))

    while not stat_queue.empty():
        stat_item = stat_queue.get()
        print(stat_item)
        stat_queue.task_done()
Run Code Online (Sandbox Code Playgroud)

需要注意的一件事:snmp本例中的模块只是执行 asubprocess.check_output()来生成子snmpget进程。