Luc*_*lla 10 c++ python tcp zeromq pyzmq
我只能找到旧的C++源代码示例.无论如何,我做了我的,基于他们.这是我在python中的发布者:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")
while True:
msg = "hello"
socket.send_string(msg)
print("sent "+ msg)
sleep(5)
Run Code Online (Sandbox Code Playgroud)
这是C++中的订阅者:
void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));
while (true) {
zmq_msg_t msg;
int rc;
rc = zmq_msg_init( & msg);
assert(rc == 0);
std::cout << "waiting for message..." << std::endl;
rc = zmq_msg_recv( & msg, subscriber, 0);
assert(rc == 1);
std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
zmq_msg_close( & msg);
}
Run Code Online (Sandbox Code Playgroud)
最初,我试过,zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") );但我想我应该收到一切,如果我没有设置这个,对吧?所以我对此发表了评论.
当我运行代码时,我会永远看到"等待消息......".
我试着用TCP来监听TCP流量tcpdump.事实证明,当发布者打开时,我看到5563端口上有很多垃圾,当我关闭发布者时,他们会停止.当我尝试一个PUSH/PULL方案时,我可以看到明文消息tcpdump.(我尝试使用nodejs并使用c ++进行推送并且它有效).
我能做错什么?
我尝试不同的组合.bind(),.connect(),localhost,127.0.0.1,但他们不会工作.
更新:我刚刚读到我必须订阅某些内容,所以我确实zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 );订阅了所有内容,但我仍然没有收到任何内容
PyZMQ的版本为17.0.0.b3,并具有ZeroMQ 4.2.3
C++有ZeroMQ 4.2.2
更新2:
对4.2.3的更新都不起作用.
“我想如果不设置这个,我应该收到所有东西,对吗? ”
不,这不是正确的假设。您可能会喜欢我这里其他ZeroMQ帖子的集合,其中包含{ unicode | 序列号}问题和{Performance- | 流量-}-影响实际策略(使用ZeroMQ在异构分布式系统的设计中可能遇到的实际策略(SUB早期ZeroMQ版本的-侧主题过滤器处理,和/或PUB较新版本的-侧处理)。
(任何其他可伸缩的正式通信原型模式,如观察到的 PUSH/PULL,都不会对订阅策略起作用,因此将独立于针对一组主题过滤器列表的订阅匹配处理。)
.send()完全是:让我们模拟一个快速的pythonic接收器,以查看发送器是否确实发送了任何东西:
import zmq
aContext = zmq.Context() # .new Context
aSUB = aContext.socket( zmq.SUB ) # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" ) # .connect
aSUB.setsockopt( zmq.LINGER, 0 ) # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # .set T-filter
MASK = "INF: .recv()-ed this:[{0:}]\n: waited {1: > 7d} [us]"
aClk = zmq.Stopwatch();
while True:
try:
aClk.start(); print MASK.format( aSUB.recv(),
aClk.stop()
)
except ( KeyboardInterrupt, SystemExit ):
pass
break
pass
aSUB.close() # .close ALWAYS!
aContext.term() # .term ALWAYS!
Run Code Online (Sandbox Code Playgroud)
无论PUB-sender实际上是.send()通过有线方式发送的,还是实际的消息到达时间[us],它都应该报告(在其中,ZeroMQ很高兴包含了此工具,用于调试和性能/延迟调整)。
如果您INF:在屏幕上看到实时消息时确实确认了ACK ,请使其继续运行,现在可以继续进行下一步了。
#include <zmq.h>
void *aContext = zmq_ctx_new();
void *aSUB = zmq_socket( aContext, ZMQ_SUB ); std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
zmq_connect( aSUB, "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_SUBSCRIBE, "", 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_LINGER, 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ... ) done" << std::endl;
int rc;
while (true) {
zmq_msg_t msg; /* Create an empty ØMQ message */
rc = zmq_msg_init (&msg); assert (rc == 0 && "EXC: in zmq_msg_init() call" );
std::cout << "INF: .. zmq_msg_init() done" << std::endl;
rc = zmq_msg_recv (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
zmq_msg_close (&msg); /* Release message */
std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close( aSUB ); std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext ); std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
Run Code Online (Sandbox Code Playgroud)
是我,是问这个问题的人。
我设法通过在 python 中交换 socket.bind("tcp://*:5563")来开始工作,socket.connect("tcp://dns_address_of_my_dcker_container:5564")
并在 C++ 中zmq_connect(subscriber, "tcp://localhost:5563")交换为zmq_bind(subscriber, "tcp://*:5563")
我在网上找到的例子说我应该用于bind发布者和connect订阅者,但它对我来说没有任何作用。有人知道为什么吗?
ZeroMQ 文档说明如下:
zmq_bind() 函数将套接字绑定到本地端点,然后接受该端点上的传入连接。
zmq_connect() 函数将套接字连接到端点,然后接受该端点上的传入连接。
我不清楚发生了什么变化,但它确实有效。