obe*_*tet 12 python network-programming yield coroutine python-asyncio
我asyncio.Protocol.data_received在新的Python asyncio模块的回调中执行异步操作时遇到问题.
考虑以下服务器:
class MathServer(asyncio.Protocol):
@asyncio.coroutine
def slow_sqrt(self, x):
yield from asyncio.sleep(1)
return math.sqrt(x)
def fast_sqrt(self, x):
return math.sqrt(x)
def connection_made(self, transport):
self.transport = transport
#@asyncio.coroutine
def data_received(self, data):
print('data received: {}'.format(data.decode()))
x = json.loads(data.decode())
#res = self.fast_sqrt(x)
res = yield from self.slow_sqrt(x)
self.transport.write(json.dumps(res).encode('utf8'))
self.transport.close()
Run Code Online (Sandbox Code Playgroud)
与以下客户一起使用:
class MathClient(asyncio.Protocol):
def connection_made(self, transport):
transport.write(json.dumps(2.).encode('utf8'))
def data_received(self, data):
print('data received: {}'.format(data.decode()))
def connection_lost(self, exc):
asyncio.get_event_loop().stop()
Run Code Online (Sandbox Code Playgroud)
随着self.fast_sqrt被召唤,一切都按预期工作.
有self.slow_sqrt,它不起作用.
它也不适用self.fast_sqrt于@asyncio.coroutine装饰器data_received.
我觉得我在这里缺少一些基本的东西.
完整的代码在这里:
经测试:
两者的问题是相同的:在slow_sqrt,客户端/服务器只会挂起什么都不做.
看起来,这需要通过一个解耦Future- 尽管我仍然不确定这是否是正确的方法.
class MathServer(asyncio.Protocol):
@asyncio.coroutine
def slow_sqrt(self, x):
yield from asyncio.sleep(2)
return math.sqrt(x)
def fast_sqrt(self, x):
return math.sqrt(x)
def consume(self):
while True:
self.waiter = asyncio.Future()
yield from self.waiter
while len(self.receive_queue):
data = self.receive_queue.popleft()
if self.transport:
try:
res = self.process(data)
if isinstance(res, asyncio.Future) or \
inspect.isgenerator(res):
res = yield from res
except Exception as e:
print(e)
def connection_made(self, transport):
self.transport = transport
self.receive_queue = deque()
asyncio.Task(self.consume())
def data_received(self, data):
self.receive_queue.append(data)
if not self.waiter.done():
self.waiter.set_result(None)
print("data_received {} {}".format(len(data), len(self.receive_queue)))
def process(self, data):
x = json.loads(data.decode())
#res = self.fast_sqrt(x)
res = yield from self.slow_sqrt(x)
self.transport.write(json.dumps(res).encode('utf8'))
#self.transport.close()
def connection_lost(self, exc):
self.transport = None
Run Code Online (Sandbox Code Playgroud)
以下是Guido van Rossum 的回答:
解决方案很简单:将该逻辑编写为标记为的单独方法
@coroutine,并在data_received()使用时将 其触发async()(== Task()在本例中).之所以没有内置到协议中的原因是,如果是,则需要备用事件循环实现来处理协同程序.
def data_received(self, data):
asyncio.ensure_future(self.process_data(data))
@asyncio.coroutine
def process_data(self, data):
# ...stuff using yield from...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3587 次 |
| 最近记录: |