对于我的网络应用程序,我使用celery
后端redis
。如果我有任务的ID,官方FAQ会通过以下方式获取任务的结果。
result = my_task.AsyncResult(task_id)
result.get()
Run Code Online (Sandbox Code Playgroud)
我可以轻松访问任务主体中的参数:
@app.task
def my_task(foo, bar, baz=None):
kwargs = self.request.kwargs
args = self.request.args
Run Code Online (Sandbox Code Playgroud)
有没有办法从 AsyncResult 或任何只有任务 ID 的地方获取args
和获取?kwargs
我正在编写客户端 - 服务器应用程序.连接时,客户端每秒向服务器发送一个"心跳"信号.在服务器端,我需要一种机制,我可以在其中添加异步执行的任务(或协程或其他).此外,当我停止发送"心跳"信号时,我想从客户端取消任务.
换句话说,当服务器启动任务时,它具有一种超时或ttl,例如3秒.当服务器收到"心跳"信号时,它会将计时器重置另外3秒,直到任务完成或客户端断开连接(停止发送信号).
以下是从pymotw.com上的asyncio教程取消任务的示例.但是这里的任务在event_loop开始之前被取消了,这对我来说并不合适.
import asyncio
async def task_func():
print('in task_func')
return 'the result'
event_loop = asyncio.get_event_loop()
try:
print('creating task')
task = event_loop.create_task(task_func())
print('canceling task')
task.cancel()
print('entering event loop')
event_loop.run_until_complete(task)
print('task: {!r}'.format(task))
except asyncio.CancelledError:
print('caught error from cancelled task')
else:
print('task result: {!r}'.format(task.result()))
finally:
event_loop.close()
Run Code Online (Sandbox Code Playgroud) 在我的async
处理程序中,我想等到任务的状态发生变化.现在,我只是在无限循环中检查状态并等待.这是一个例子,wait_until_done
函数:
import asyncio
class LongTask:
state = 'PENDING'
my_task = LongTask()
def done():
my_task.state = 'DONE'
async def wait_until_done():
while True:
if my_task.state == 'PENDING':
await asyncio.sleep(2)
else:
break
print("Finally, the task is done")
def main(loop, *args, **kwargs):
asyncio.ensure_future(wait_until_done())
loop.call_later(delay=5, callback=done)
loop = asyncio.get_event_loop()
main(loop)
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
这样做有更好的方法吗?
我想在龙卷风的异步GET请求处理程序中运行一个缓慢的阻止方法(实际上是从第三方库中)。将该方法设为:
def blocking_method(uid):
print("slow method started: ", uid)
time.sleep(10)
print("slow method done: ", uid)
return "slow method ({}) result".format(uid)
Run Code Online (Sandbox Code Playgroud)
此外,我更喜欢在asyncio的事件循环中运行龙卷风服务器:
if __name__ == '__main__':
tornado.platform.asyncio.AsyncIOMainLoop().install()
loop = asyncio.get_event_loop()
loop.run_until_complete(make_app())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
我知道@run_in_executor
装饰器,但是它不适合我,因为我使用asyncio。要在异步协程中运行阻止方法,应使用的run_in_executor
方法asyncio.get_event_loop()
。这是一个示例,如何从此答案中做到这一点:
import asyncio
async def main():
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
future1 = loop.run_in_executor(executor, blocking_method, 1)
future2 = loop.run_in_executor(executor, blocking_method, 2)
response1 = await future1
response2 = await future2
print(response1)
print(response2)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
而且效果很好,这是先前脚本的输出:
slow …
Run Code Online (Sandbox Code Playgroud)