Ant*_*ine 4 python asynchronous publish-subscribe zeromq python-asyncio
我正在尝试编写基于Asyncio的简单程序和使用ZeroMQ实现的发布/订阅设计模式.出版商有2个协同程序; 一个用于侦听传入的订阅,另一个用于将值(通过HTTP请求获取)发布到订阅者.订户订阅特定参数(在这种情况下为城市名称),并等待该值(该城市的温度).
这是我的代码:
publisher.py
#!/usr/bin/env python
import json
import aiohttp
import aiozmq
import asyncio
import zmq
class Publisher:
BIND_ADDRESS = 'tcp://*:10000'
def __init__(self):
self.stream = None
self.parameter = ""
@asyncio.coroutine
def main(self):
self.stream = yield from aiozmq.create_zmq_stream(zmq.XPUB, bind=Publisher.BIND_ADDRESS)
tasks = [
asyncio.async(self.subscriptions()),
asyncio.async(self.publish())]
print("before wait")
yield from asyncio.wait(tasks)
print("after wait")
@asyncio.coroutine
def subscriptions(self):
print("Entered subscriptions coroutine")
while True:
print("New iteration of subscriptions loop")
received = yield from self.stream.read()
first_byte = received[0][0]
self.parameter = received[0][-len(received[0])+1:].decode("utf-8")
# Subscribe request
if first_byte == 1:
print("subscription request received for parameter "+self.parameter)
# Unsubscribe request
elif first_byte == 0:
print("Unsubscription request received for parameter "+self.parameter)
@asyncio.coroutine
def publish(self):
print("Entered publish coroutine")
while True:
if self.parameter:
print("New iteration of publish loop")
# Make HTTP request
url = "http://api.openweathermap.org/data/2.5/weather?q="+self.parameter
response = yield from aiohttp.request('GET', url)
assert response.status == 200
content = yield from response.read()
# Decode JSON string
decoded_json = json.loads(content.decode())
# Get parameter value
value = decoded_json["main"]["temp"]
# Publish fetched values to subscribers
message = bytearray(self.parameter+":"+str(value),"utf-8")
print(message)
pack = [message]
print("before write")
yield from self.stream.write(pack)
print("after write")
yield from asyncio.sleep(10)
test = Publisher()
loop = asyncio.get_event_loop()
loop.run_until_complete(test.main())
Run Code Online (Sandbox Code Playgroud)
subscriber.py
#!/usr/bin/env python
import zmq
class Subscriber:
XSUB_CONNECT = 'tcp://localhost:10000'
def __init__(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.XSUB)
self.socket.connect(Subscriber.XSUB_CONNECT)
def loop(self):
print(self.socket.recv())
self.socket.close()
def subscribe(self, parameter):
self.socket.send_string('\x01'+parameter)
print("Subscribed to parameter "+parameter)
def unsubscribe(self, parameter):
self.socket.send_string('\x00'+parameter)
print("Unsubscribed to parameter "+parameter)
test = Subscriber()
test.subscribe("London")
while True:
print(test.socket.recv())
Run Code Online (Sandbox Code Playgroud)
这是输出:
订阅方:
$ python3 subscriber.py
Subscribed to parameter London
b'London:288.15'
Run Code Online (Sandbox Code Playgroud)
出版方:
$ python3 publisher.py
before wait
Entered subscriptions coroutine
New iteration of subscriptions loop
Entered publish coroutine
subscription request received for parameter London
New iteration of subscriptions loop
New iteration of publish loop
bytearray(b'London:288.15')
before write
Run Code Online (Sandbox Code Playgroud)
该计划被困在那里.
如您所见,"before write"
输出中显示消息并发送消息,但"after write"
不会显示.所以,我认为可能会引发一个异常,并在self.stream.write(pack)
调用堆栈中的某个位置捕获.
如果我发送KeyboardInterrupt
给发布者,这是我得到的:
Traceback (most recent call last):
File "publisher.py", line 73, in <module>
loop.run_until_complete(test.main())
File "/usr/lib/python3.4/asyncio/base_events.py", line 304, in run_until_complete
self.run_forever()
File "/usr/lib/python3.4/asyncio/base_events.py", line 276, in run_forever
self._run_once()
File "/usr/lib/python3.4/asyncio/base_events.py", line 1136, in _run_once
event_list = self._selector.select(timeout)
File "/usr/lib/python3.4/selectors.py", line 432, in select
fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Task exception was never retrieved
future: <Task finished coro=<publish() done, defined at publisher.py:43> exception=TypeError("'NoneType' object is not iterable",)>
Traceback (most recent call last):
File "/usr/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = coro.send(value)
File "publisher.py", line 66, in publish
yield from self.stream.write(pack)
TypeError: 'NoneType' object is not iterable
Task was destroyed but it is pending!
task: <Task pending coro=<subscriptions() running at publisher.py:32> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/lib/python3.4/asyncio/tasks.py:399]>
Run Code Online (Sandbox Code Playgroud)
所以我猜我的问题实际上就是这个错误:TypeError: 'NoneType' object is not iterable
但是我不知道是什么导致它.
这里出了什么问题?
问题是你正在尝试yield from
调用self.stream.write()
,但stream.write
实际上并不是协同程序.当你调用yield from
一个项目时,Python内部调用iter(item)
.在这种情况下,调用将write()
返回None
,因此Python正在尝试iter(None)
- 因此您看到的异常.
要解决它,你应该write()
像普通函数一样调用.如果您想要等到write
刷新并发送给阅读器,yield from stream.drain()
请在拨打电话后使用write()
:
print("before write")
self.stream.write(pack)
yield from self.stream.drain()
print("after write")
Run Code Online (Sandbox Code Playgroud)
另外,为了确保在publish
不需要Ctrl + C的情况下引发异常,请使用asyncio.gather
而不是asyncio.wait
:
yield from asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)
使用时asyncio.gather
,任何内部任务抛出的异常tasks
都将被重新引发.