使用Python中的ZMQ在客户端/服务器配置中进行嗅探器/监视器

fla*_*nco 4 python sockets zeromq pyzmq

我通过ZeroMQ实现了一个客户端/服务器,我想添加一个嗅探器/监视器来捕获两者之间的通信.

              client <---------> server
              (REQ)       |         (REP)
                          | 
                          |
                          v
                        sniffer  <-this is what I want to add
Run Code Online (Sandbox Code Playgroud)

如果客户端/服务器通过套接字5555进行通信,那么我怎么能添加一个嗅探器来监听同一个套接字呢?有没有办法区分哪个消息来自客户端,哪个消息来自服务器?有人可以分享经验吗?

根据Jan的回答编辑

配置将变为:

client [REQ]----->[ROUTER:4444] monitor [DEALER]------->[REP:5555] server
                              [PUB:7777]
                                  ^
                                  |
                                  |
                                  |
                                  | 
                                [SUB] 
                            monitorclient(sniffer)  <-this is what I want to add
Run Code Online (Sandbox Code Playgroud)

箭头显示连接方向(前往绑定端口).

消息流动:

  • client- > monitor- > server- > monitor- >client
  • 而且monitor- >monitorclient

有一个更好的图片在这里.

Jan*_*sky 13

对于嗅探,我们需要一些中间部分.

zmq提供了几种选择

  • 编写自己的程序,接受一方请求,发送请求,获得响应,发送给原始请求者,并将此流量报告给您
  • 使用zmq.proxy - 但是,这需要最新版本的libzmq(zmq.zmq_version_info()> = 3),这在我的Ubuntu 14.04上目前甚至不可用,所以我跳过这个.
  • 使用MonitoredQueue - 这是你可能想要的.这提供了在前端和后端之间交换消息的循环,同时发布/推送/发送到另一个套接字.

计划

此解决方案基于pyzmq doc中的MonitoredQueue示例

服务器绑定到端口5555

服务器将绑定到端口5555.与其他示例不同,我将您的服务器保持为固定部分,而不是更改它连接到MontitoredQueue.但是,这样的交换不是问题,也不会产生任何问题(只要你正确调整MonitoredQueue).

MonitoredQueue绑定到端口4444,连接到端口5555,在端口7777上发布流量

MonitoredQueue位于客户端和服务器之间.它侦听端口4444,向服务器发送请求并将响应发送回客户端.同时,任何传递的消息都将在PUB套接字上以相应的前缀"in"或"out"发布.我们稍后会看到,这些不仅包含前缀和请求/响应,还包含客户端的身份.

客户端连接到端口4444

客户端可以通过端口5555直接连接到服务器,但这不允许我们嗅探流量.出于这个原因,我们将客户端连接到端口4444,MonitoredQueue等待服务器和sniff.

您将看到,客户端和服务器不必更改一行代码即可参与此交换.

真实的代码

server.py

在我们的示例中,服务器需要一个可以转换为整数的字符串,并返回一个doubled值的字符串.

import zmq

def double_server(server_url="tcp://*:5555"):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(server_url)
    print "server started..."
    while True:
        req = socket.recv()
        print "server received request", req
        result = str(2*int(req))
        socket.send(result)
        print "server replied with", result

if __name__ == "__main__":
    double_server()
Run Code Online (Sandbox Code Playgroud)

client.py

我们的客户将尝试5次在localhost上的端口4444上询问一些结果.

import zmq

def client(server_url="tcp://localhost:4444"):
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    # socket.setsockopt(zmq.IDENTITY, "client_id_abc") # see Conclusions
    socket.connect(server_url)

    for i in range(5):
        print "request", i
        socket.send(str(i))
        res = socket.recv()
        print i, "result: ", res

if __name__ == "__main__":
    client()
Run Code Online (Sandbox Code Playgroud)

您现在可以尝试直接连接到端口5555以查看它是否有效,但是对于我们的嗅探,它必须与MonitoredQueue通信.

monitor.py

这就是所有的魔力.pyzmq已经提供设备MonitoredQueue,所以我们可以简单地使用它.

import zmq
from zmq.devices.monitoredqueuedevice import MonitoredQueue
from zmq.utils.strtypes import asbytes

def monitoredqueue(frontend_url="tcp://*:4444", server_url="tcp://localhost:5555", capture_url="tcp://*:7777"):
    mondev = MonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB, asbytes("in"), asbytes("out"))
    mondev.bind_in(frontend_url)
    mondev.connect_out(server_url)
    mondev.bind_mon(capture_url)
    mondev.setsockopt_in(zmq.HWM, 1)
    mondev.start()
    print "monitored queue started"

if __name__ == "__main__":
    monitoredqueue()
Run Code Online (Sandbox Code Playgroud)

关于套接字类型和别名的注意事项:

  • zmq.ROUTER曾经被称为zmq.XREP
  • zmq.DEALER曾经被称为zmq.XREQ
  • 这些别名仍然有效.

MonitoredQueue将在端口7777上发布zmq.PUB套接字上传递的每条消息.这些消息将以"in"和"out"为前缀,并且还将包含一个带有标识字符串的帧.此标识字符串由ROUTER套接字分配,在交换期间,它对所有连接的客户端都是唯一的.该身份是所谓的信封的一部分,来自由空帧界定的请求/回复消息(很快就会看到).

monitorclient.py

此监控客户端仅用于显示,如何获取嗅探信息.

它订阅了端口7777,由监视器(MonitoredQueue)提供并打印出来.使用多部分消息很重要,否则我们会遗漏一些信息.

import zmq

def monitorclient(server_url="tcp://localhost:7777"):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect(server_url)
    socket.setsockopt(zmq.SUBSCRIBE, "")
    print "started monitoring client"

    while True:
        res = socket.recv_multipart()
        print res

if __name__ == "__main__":
    monitorclient()
Run Code Online (Sandbox Code Playgroud)

运行

我们需要打开4个控制台,每个控制台都会启动一个python脚本

首先启动服务器:

$ python server.py
Run Code Online (Sandbox Code Playgroud)

启动MonitoredQueue

$ python monitor.py
Run Code Online (Sandbox Code Playgroud)

启动客户端,阅读嗅探消息

$ python monitorclient.py
Run Code Online (Sandbox Code Playgroud)

最后,启动客户端尝试从MonitoredQueue代理的服务器获得一些响应

$ python client.py
request 0
0 result:  0
request 1
1 result:  2
request 2
2 result:  4
request 3
3 result:  6
request 4
4 result:  8
Run Code Online (Sandbox Code Playgroud)

结果如预期.

现在检查server.py输出:

$ python server.py
server received request 0
server replied with 0
server received request 1
server replied with 2
server received request 2
server replied with 4
server received request 3
server replied with 6
server received request 4
server replied with 8
Run Code Online (Sandbox Code Playgroud)

毫不奇怪,一切顺利.

我们monitor.py不打印任何东西,我们将不得不检查输出monitorclient.py

$ python monitorclient.py 
started monitoring client
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '0']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '1']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '2']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '3']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '6']
['in', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '4']
['out', '\x00\xc4\x84\x1c\xf2\xc2.@\xd3\x86cN\x0e\x06\x7f\xaf\x0b', '', '8']
Run Code Online (Sandbox Code Playgroud)

在这里,您可以看到所有10条消息的打印输出,5条请求,5条响应.

每一个具有这样的结构[prefix, identity, emptyframe, message],其中

  • prefix 是"在"还是"在外"
  • identity是MonitoredQueues分配给特定客户端的字符串.每次客户端连接时,此标识可能会更改.作为奖励,我们可能连接多个客户端,仍然有机会区分不同的客户端.如果您需要特定的客户端身份,见注释行client.pysocket.setsockopt(zmq.IDENTITY, "client_id_abc").如果您取消注释,您将看到"client_id_abc"您的客户身份.
  • emptyframe被视为''并且正在从消息数据中划分包络.
  • message 客户问的是什么或服务器回复了什么.

结论

  • 嗅探工作,PyZMQ已经为此提供了MonitoredQueue设备
  • 通过zmq.PUB嗅探不会阻止任何通信,你可能只是忽略了嗅探数据,一切都会起作用.
  • 对于生产来说,实际上将MonitoredQueue固定在系统的一部分,从而绑定到已知的IP地址和端口.这将需要在服务器上进行更改,这必须连接(而不是当前绑定).这样的改变是微不足道的,不会影响代码和行为的其余部分.如果您只有一个要监视的端点,您还可以将监视器嵌入到服务器中(这需要2个线程,一个用于服务器,另一个用于监视器).
  • zmq 对于这类任务来说,这是伟大的"乐高".