Refer to http://hintjens.wdfiles.com/local--files/main:files/cc1pe.pdf
Page 22 Chapter Divide and Conquer
Ventilator[PUSH]
___________________|____________________
| | |
[PULL]Worker[PUSH] [PULL]Worker[PUSH] [PULL]Worker[PUSH]
|__________________|___________________|
|
[PULL]Sink
// taskvent.c
// Socket to send messages on
void *context = zmq_ctx_new ();
void *sender = zmq_socket (context, ZMQ_PUSH);
zmq_bind (sender, "tcp://*:5557");
// Socket to send start of batch message on
void *sink = zmq_socket (context, ZMQ_PUSH);
zmq_connect (sink, "tcp://localhost:5558");
// taskwork.c
// Socket to receive messages on
void *context = zmq_ctx_new ();
void *receiver = zmq_socket (context, ZMQ_PULL);
zmq_connect …Run Code Online (Sandbox Code Playgroud) 我是消息队列的新手,现在我ZeroMQ在我的Linux服务器上使用.我PHP用来写客户端和服务器.这主要用于处理推送通知.
正如他们所展示的那样,我在单个I/O线程实例上使用基本的REQ- REP形式 - 通信模式ZMQContext.
这是最小化的zeromqServer.php代码:
include("someFile.php");
$context = new ZMQContext(1);
// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");
while (true) {
$request = $responder->recv();
printf ("Received request: [%s]\n", $request);
// -----------------------------------------------------------------
// Process push notifications here
//
sleep (1);
// -----------------------------------------------------------------
// Send reply back to client
$responder->send("Basic Reply");
}
Run Code Online (Sandbox Code Playgroud)
这是一个最小化的ZeroMQ 客户端:
$context = new ZMQContext();
// Socket to talk to server …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用zeromq框架实现pub子设计模式.我们的想法是推出订阅者,然后推出发布者.订阅者将收听100条消息,发布者将发布100条消息.到目前为止一切都那么好......然而实际发生的事情是,即使发布者发布时订户已经启动并运行,订阅者也不会收到所有消息(订阅者将收到100条消息,如果出版商将发送至少500条消息).似乎发布者发送的第一条消息不会发送给订阅者.
有任何想法吗?
提前谢谢,奥梅尔.
订阅者代码(在发布者之前发布)
int i=0;
zmq::context_t context (1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect("tcp://localhost:5556");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
for (int update_nbr = 0; update_nbr < 100; update_nbr++)
{
zmq::message_t update;
subscriber.recv(&update);
i++;
std::cout<<"receiving :"<<i<<std::endl;
}
Run Code Online (Sandbox Code Playgroud)
发布商代码(在订阅者之后启动)
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
int i = 0;
for (int update_nbr = 0; update_nbr < 100; update_nbr++)
{
// Send message to all subscribers
zmq::message_t request (20);
time_t seconds;
seconds = time (NULL);
char update [20]="";
sprintf (update, "%ld", seconds);
memcpy …Run Code Online (Sandbox Code Playgroud) 我想知道,是否可以通过ZeroMQ与Arduino进行通信.我想使用Arduino来控制一些设备,并希望将大部分应用程序放在计算机上.为此,如果Arduino和控制器可以通过zeromq进行通信并说出WiFi或以太网屏蔽,那就太好了.那可能吗?
特别是,考虑到Arduino板上的当前芯片,原则上是否可行,并且有没有人在这方面取得成功?
试图编译从zeromq教程中的例子hello_world.c: http://zguide.zeromq.org/page:all#Ask-and-Ye-Shall-Receive 敢肯定我已经安装在OSX山狮的一切.
clang -Wall hwserver.c -o hwserver
Run Code Online (Sandbox Code Playgroud)
给我一个错误:
Undefined symbols for architecture x86_64:
"_zmq_bind", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_ctx_new", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_msg_close", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_msg_data", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_msg_init", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_msg_init_size", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_msg_recv", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_msg_send", referenced from:
_main in hwserver-OgrEe6.o
"_zmq_socket", referenced from:
_main in hwserver-OgrEe6.o
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed …Run Code Online (Sandbox Code Playgroud) 当我在Windows(Windows 8)上尝试使用python的路由器示例时,我收到以下错误消息:
Traceback (most recent call last):
File "router.py", line 43, in <module>
client.bind("ipc://routing.ipc")
File "socket.pyx", line 432, in zmq.core.socket.Socket.bind (zmq\core\socket.c:3870)
File "checkrc.pxd", line 23, in zmq.core.checkrc._check_rc (zmq\core\socket.c:5712)
zmq.error.ZMQError: Protocol not supported
Run Code Online (Sandbox Code Playgroud)
因此,我认为zeromq的IPC传输通道不支持Windows(至少Windows 8).这是真的?
上
g++ actualApp.cpp -lzmq
Run Code Online (Sandbox Code Playgroud)
我明白了
actualApp.cpp:6:19: error: zmq.hpp: No such file or directory
actualApp.cpp: In function ‘int main()’:
actualApp.cpp:13: error: ‘zmq’ has not been declared
actualApp.cpp:13: error: expected `;' before ‘context’
actualApp.cpp:14: error: ‘zmq’ has not been declared
actualApp.cpp:14: error: expected `;' before ‘socket’
actualApp.cpp:15: error: ‘socket’ was not declared in this scope
actualApp.cpp:18: error: ‘zmq’ has not been declared
actualApp.cpp:18: error: expected `;' before ‘request’
actualApp.cpp:21: error: ‘request’ was not declared in this scope
actualApp.cpp:28: error: ‘zmq’ has not …Run Code Online (Sandbox Code Playgroud) 首先,我要感谢这个惊人的lib!我喜欢它.客户端将自己连接到服务器.服务器应该保存IP并稍后使用它(我真的需要IP).我找到答案:http://lists.zeromq.org/pipermail/zeromq-dev/2010-September/006381.html但我不明白我是如何从消息中获取IP的(XREP)...我想我只能读取ID,但IP由0MQ内部管理.他的第二个解决方案建议将IP作为消息的一部分发送,但我不明白如何获得"公共"-IP.我找到了帖子:在ZeroMQ中获取TCP地址信息
is pass bind a service to an ephemeral port, get a full connection endpoint ("tcp://ipaddress:port")
Run Code Online (Sandbox Code Playgroud)
我不明白这是怎么回事.他的意思是网络服务吗?在我看来,最好将IP从0MQ中取出(它已经拥有IP).我甚至会调整0MQ,如果有人可以指向保存IP的地方,找不到它.目前,套接字类型并不重要.我更喜欢smth REQ-REP之类的.谢谢!
我在常见问题解答中注意到,在监控部分中,无法获得连接对等列表或在对等连接/断开连接时收到通知.
这是否意味着它也不可能从上游反馈中知道PUB/XPUB套接字知道它应该发布哪些主题?或者有没有办法访问这些数据?
我知道ZMQ> = 3.0" 支持发布者的PUB/SUB过滤 ",但我真正想要的是过滤我的应用程序代码,使用ZMQ有关订阅哪些主题的知识.
我的用例是我想发布有关机器人状态的信息.一些主题涉及主要的硬件操作,例如切换ADC上的选择线以读取IR值.
我在机器人上运行了一个发布者线程,当有实际订阅者时,它应该只执行"读取"以获取IR数据.但是,因为我只能将一个字符串提供给我的pub_sock.send,所以我总是要做昂贵的操作,即使ZMQ即将在没有订阅者时丢弃该消息.
我有一个实现,它使用反向通道REQ/REP套接字发送主题信息,我的应用程序可以在其发布循环中检查,从而只收集需要收集的数据.这似乎非常不优雅,因为ZMQ必须已经拥有我需要的数据,这可以通过它对发布者的过滤来证明.
我注意到在这个邮件列表消息中,OP似乎能够看到订阅消息被发送到XPUB套接字.
但是,没有提到他们是如何做到这一点的,而且我没有在文档中看到任何这样的能力(仍在寻找).也许他们只是使用Wireshark(查看XPUB套接字的上游订阅消息).
我创建了一个类似于以下的套接字文件,并希望MQL5必须读取套接字的输出.请参阅以下python代码;
daemon.py
import socket
#import arcpy
def actual_work():
#val = arcpy.GetCellValue_management("D:\dem-merged\lidar_wsg84", "-95.090174910630012 29.973962146120652", "")
#return str(val)
return 'dummy_reply'
def main():
sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
try:
sock.bind( ('127.0.0.1', 6666) )
while True:
data, addr = sock.recvfrom( 4096 )
reply = actual_work()
sock.sendto(reply, addr)
except KeyboardInterrupt:
pass
finally:
sock.close()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
client.py
import socket
import sys
def main():
sock = socket.socket( socket.AF_INET, socket.SOCK_DGRAM )
sock.settimeout(1)
try:
sock.sendto('', ('127.0.0.1', 6666))
reply, _ = sock.recvfrom(4096)
print reply
except socket.timeout: …Run Code Online (Sandbox Code Playgroud)