Lio*_*cer 5 python python-asyncio aiohttp python-3.5
我是asyncio和的新手aiohttp。我目前收到此错误,并且不确定为什么我会InvalidStateError为我的asyncio未来和RuntimeError我的会话收到此错误:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py", line 184, in _run_module_as_main
"__main__", mod_spec)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/tests/tracer.py", line 100, in <module>
sys.exit(main(sys.argv))
File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/tests/tracer.py", line 92, in main
poster.post()
File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 87, in post
results = event_loop.run_until_complete(self.async_post_events(events))
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 79, in async_post_events
task.add_done_callback(self.send_oracle, task.result(), session)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 268, in result
raise InvalidStateError('Result is not ready.')
asyncio.futures.InvalidStateError: Result is not ready.
Task exception was never retrieved
future: <Task finished coro=<Poster.async_post_event() done, defined at /Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py:62> exception=RuntimeError('Session is closed',)>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 64, in async_post_event
async with session.post(self.endpoint, data=event) as resp:
File "/Users/bli1/Development/QE/idea/trinity-tracer/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__
self._resp = yield from self._coro
File "/Users/bli1/Development/QE/idea/trinity-tracer/lib/python3.5/site-packages/aiohttp/client.py", line 161, in _request
raise RuntimeError('Session is closed')
RuntimeError: Session is closed
Run Code Online (Sandbox Code Playgroud)
我想做的是POST到一个端点,然后使用相同的事件POSTed 发布到另一个端点。async这将作为另一种方法运行callback
这是我的代码:
async def async_post_event(self, event, session):
async with session.post(self.endpoint, data=event) as resp:
event["tracer"]["post"]["timestamp"] = time.time() * 1000.0
event["tracer"]["post"]["statusCode"] = await resp.status
return event
async def send_oracle(self, event, session):
async with session.post(self.oracle, data=event) as resp:
return event["event"]["event_header"]["event_id"], await resp.status
async def async_post_events(self, events):
tasks = []
conn = aiohttp.TCPConnector(verify_ssl=self.secure)
async with aiohttp.ClientSession(connector=conn) as session:
for event in events:
task = asyncio.ensure_future(self.async_post_event(event, session))
task.add_done_callback(self.send_oracle, task.result(), session)
tasks.append(task)
return await asyncio.gather(*tasks)
def post(self):
event_loop = asyncio.get_event_loop()
try:
events = [self.gen_random_event() for i in range(self.num_post)]
results = event_loop.run_until_complete(self.async_post_events(events))
print(results)
finally:
event_loop.close()
Run Code Online (Sandbox Code Playgroud)
add_done_callback接受回调,而不是协程。此外,它是非常低级 API 的一部分,业余开发人员应该避免使用。
但您的主要错误是在异步上下文管理器session.post()之外调用ClientSession,堆栈跟踪明确指向它。
我修改了您的代码片段以获得看起来像工作代码的内容:
async def async_post_event(self, event, session):
async with session.post(self.endpoint, data=event) as resp:
event["tracer"]["post"]["timestamp"] = time.time() * 1000.0
event["tracer"]["post"]["statusCode"] = await resp.status
async with session.post(self.oracle, data=event) as resp:
return event["event"]["event_header"]["event_id"], await resp.status
async def async_post_events(self, events):
coros = []
conn = aiohttp.TCPConnector(verify_ssl=self.secure)
async with aiohttp.ClientSession(connector=conn) as session:
for event in events:
coros.append(self.async_post_event(event, session))
return await asyncio.gather(*coros)
def post(self):
event_loop = asyncio.get_event_loop()
try:
events = [self.gen_random_event() for i in range(self.num_post)]
results = event_loop.run_until_complete(self.async_post_events(events))
print(results)
finally:
event_loop.close()
Run Code Online (Sandbox Code Playgroud)
您可以将两个帖子提取到async_post_event单独的协程中,但主要思想保持不变。
| 归档时间: |
|
| 查看次数: |
11842 次 |
| 最近记录: |