我把头发拉过这一头.我正在尝试使用zeromq和gevent的最简单的示例.我将此脚本更改为使用PUB/SUB套接字,当我运行它时,'server'套接字永远循环.如果我取消注释gevent.sleep(0.1)行,那么它按预期工作并产生另一个绿色线程,在这种情况下是客户端.
问题是,为什么我必须手动添加睡眠呼叫?我想当我导入zmq.green版本的zmq时,发送和接收调用是非阻塞的,并且在下面执行任务切换.
换句话说,为什么我必须添加gevent.sleep()调用以使此示例工作?在Jeff Lindsey的原始示例中,他正在执行REQ/REP套接字并且他不需要添加睡眠调用...但是当我将其更改为PUB/SUB时,我需要它来为此处理客户端.
#Notes: Code taken from slide: http://www.google.com/url?sa=t&rct=j&q=zeromq%20gevent&source=web&cd=27&ved=0CFsQFjAGOBQ&url=https%3A%2F%2Fraw.github.com%2Fstrangeloop%2F2011-slides%2Fmaster%2FLindsay-DistributedGeventZmq.pdf&ei=JoDNUO6OIePQiwK8noHQBg&usg=AFQjCNFa5g9ZliRVoN_yVH7aizU_fDMtfw&bvm=bv.1355325884,d.cGE
#Jeff Lindsey talk on gevent and zeromq
import gevent
from gevent import spawn
import zmq.green as zmq
context = zmq.Context()
def serve():
print 'server online'
socket = context.socket(zmq.PUB)
socket.bind("ipc:///tmp/jeff")
while True:
print 'send'
socket.send("World")
#gevent.sleep(0.1)
def client():
print 'client online'
socket = context.socket(zmq.SUB)
socket.connect("ipc:///tmp/jeff")
socket.setsockopt(zmq.SUBSCRIBE, '')
while True:
print 'recv'
message = socket.recv()
cl = spawn(client)
server = spawn(serve)
print 'joinall'
gevent.joinall([cl, server])
print 'end'
Run Code Online (Sandbox Code Playgroud)
我想当我导入zmq.green版本的zmq时,发送和接收调用是非阻塞的,并且在下面执行任务切换.
zmq.green只会在这些调用被阻塞时产生,如果它们准备好它就不会产生(没有什么可以等待的).在你的情况下,发送者总是准备好,所以它永远不会有理由屈服.
一些指示:
gevent.sleep(0),它不需要是有限的.socket.send只有在套接字未准备好发送(not (socket.events & zmq.POLLOUT))时才会阻塞,这对于PUB套接字来说实际上永远不会是真的(你会在HWM看到PUSH,DEALER等).zmq.green的含义是将send/recv转换为:
try:
socket.send(msg, zmq.NOBLOCK) # or recv
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
yield # and wait for socket to be ready, then try again
Run Code Online (Sandbox Code Playgroud)
所以如果带有NOBLOCK的send/recv总是成功的话,套接字永远不会产生.
换句话说:如果套接字无需等待,它就不会等待.
| 归档时间: |
|
| 查看次数: |
3015 次 |
| 最近记录: |