dfa*_*l07 14 sockets publish-subscribe zeromq
我在常见问题解答中注意到,在监控部分中,无法获得连接对等列表或在对等连接/断开连接时收到通知.
这是否意味着它也不可能从上游反馈中知道PUB/XPUB套接字知道它应该发布哪些主题?或者有没有办法访问这些数据?
我知道ZMQ> = 3.0" 支持发布者的PUB/SUB过滤 ",但我真正想要的是过滤我的应用程序代码,使用ZMQ有关订阅哪些主题的知识.
我的用例是我想发布有关机器人状态的信息.一些主题涉及主要的硬件操作,例如切换ADC上的选择线以读取IR值.
我在机器人上运行了一个发布者线程,当有实际订阅者时,它应该只执行"读取"以获取IR数据.但是,因为我只能将一个字符串提供给我的pub_sock.send,所以我总是要做昂贵的操作,即使ZMQ即将在没有订阅者时丢弃该消息.
我有一个实现,它使用反向通道REQ/REP套接字发送主题信息,我的应用程序可以在其发布循环中检查,从而只收集需要收集的数据.这似乎非常不优雅,因为ZMQ必须已经拥有我需要的数据,这可以通过它对发布者的过滤来证明.
我注意到在这个邮件列表消息中,OP似乎能够看到订阅消息被发送到XPUB套接字.
但是,没有提到他们是如何做到这一点的,而且我没有在文档中看到任何这样的能力(仍在寻找).也许他们只是使用Wireshark(查看XPUB套接字的上游订阅消息).
使用zmq.XPUB套接字类型,有一种方法可以检测新用户和离开用户.以下代码示例显示了如何:
# Publisher side
import zmq
ctx = zmq.Context.instance()
xpub_socket = ctx.socket(zmq.XPUB)
xpub_socket.bind("tcp://*:%d" % port_nr)
poller = zmq.Poller()
poller.register(xpub_socket)
events = dict(poller.poll(1000))
if xpub_socket in events:
msg = xpub_socket.recv()
if msg[0] == b'\x01':
topic = msg[1:]
print "Topic '%s': new subscriber" % topic
elif msg[0] == b'\x00':
topic = msg[1:]
print "Topic '%s': subscriber left" % topic
Run Code Online (Sandbox Code Playgroud)
请注意,zmq.XSUB套接字类型的订阅方式与"普通"不同zmq.SUB.代码示例:
# Subscriber side
import zmq
ctx = zmq.Context.instance()
# Subscribing of zmq.SUB socket
sub_socket = ctx.socket(zmq.SUB)
sub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # OK
sub_socket.connect("tcp://localhost:%d" % port_nr)
# Subscribing zmq.XSUB socket
xsub_socket = ctx.socket(zmq.XSUB)
xsub_socket.connect("tcp://localhost:%d" % port_nr)
# xsub_socket.setsockopt(zmq.SUBSCRIBE, "sometopic") # NOK, raises zmq.error.ZMQError: Invalid argument
xsub_socket.send_multipart([b'\x01', b'sometopic']) # OK, triggers the subscribe event on the publisher
Run Code Online (Sandbox Code Playgroud)
我还想指出zmq.XPUB_VERBOSEsocket选项.如果设置,则在套接字上接收所有订阅事件.如果未设置,则会过滤重复的订阅.另请参阅以下帖子:ZMQ:多个订阅者的XPUB套接字上没有订阅消息(Last Value Caching模式)
至少对于 XPUB/XSUB 套接字情况,您可以通过手动转发和处理包来保存订阅状态:
context = zmq.Context()
xsub_socket = context.socket(zmq.XSUB)
xsub_socket.bind('tcp://*:10000')
xpub_socket = context.socket(zmq.XPUB)
xpub_socket.bind('tcp://*:10001')
poller = zmq.Poller()
poller.register(xpub_socket, zmq.POLLIN)
poller.register(xsub_socket, zmq.POLLIN)
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
break
if xpub_socket in events:
message = xpub_socket.recv_multipart()
# HERE goes some subscription handle code which inspects
# message
xsub_socket.send_multipart(message)
if xsub_socket in events:
message = xsub_socket.recv_multipart()
xpub_socket.send_multipart(message)
Run Code Online (Sandbox Code Playgroud)
(这是Python代码,但我猜C/C++看起来很相似)
我目前正在研究这个主题,我将尽快添加更多信息。
| 归档时间: |
|
| 查看次数: |
1872 次 |
| 最近记录: |