小编Ser*_*ash的帖子

如何将参数传递给 celery 的任务?

对于我的网络应用程序,我使用celery后端redis。如果我有任务的ID,官方FAQ会通过以下方式获取任务的结果。

result = my_task.AsyncResult(task_id)
result.get()
Run Code Online (Sandbox Code Playgroud)

我可以轻松访问任务主体中的参数:

@app.task
def my_task(foo, bar, baz=None):
    kwargs = self.request.kwargs
    args = self.request.args
Run Code Online (Sandbox Code Playgroud)

有没有办法从 AsyncResult 或任何只有任务 ID 的地方获取args和获取?kwargs

python redis celery

7
推荐指数
1
解决办法
9281
查看次数

如何使用asyncio计划​​和取消任务

我正在编写客户端 - 服务器应用程序.连接时,客户端每秒向服务器发送一个"心跳"信号.在服务器端,我需要一种机制,我可以在其中添加异步执行的任务(或协程或其他).此外,当我停止发送"心跳"信号时,我想从客户端取消任务.

换句话说,当服务器启动任务时,它具有一种超时或ttl,例如3秒.当服务器收到"心跳"信号时,它会将计时器重置另外3秒,直到任务完成或客户端断开连接(停止发送信号).

以下是从pymotw.com上的asyncio教程取消任务的示例.但是这里的任务在event_loop开始之前被取消了,这对我来说并不合适.

import asyncio

async def task_func():
    print('in task_func')
    return 'the result'


event_loop = asyncio.get_event_loop()
try:
    print('creating task')
    task = event_loop.create_task(task_func())

    print('canceling task')
    task.cancel()

    print('entering event loop')
    event_loop.run_until_complete(task)
    print('task: {!r}'.format(task))
except asyncio.CancelledError:
    print('caught error from cancelled task')
else:
    print('task result: {!r}'.format(task.result()))
finally:
    event_loop.close()
Run Code Online (Sandbox Code Playgroud)

python python-asyncio

6
推荐指数
1
解决办法
1万
查看次数

如何等待对象改变状态

在我的async处理程序中,我想等到任务的状态发生变化.现在,我只是在无限循环中检查状态并等待.这是一个例子,wait_until_done函数:

import asyncio


class LongTask:
    state = 'PENDING'

my_task = LongTask()


def done():
    my_task.state = 'DONE'

async def wait_until_done():
    while True:
        if my_task.state == 'PENDING':
            await asyncio.sleep(2)
        else:
            break
    print("Finally, the task is done")


def main(loop, *args, **kwargs):
    asyncio.ensure_future(wait_until_done())
    loop.call_later(delay=5, callback=done)

loop = asyncio.get_event_loop()
main(loop)
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

这样做有更好的方法吗?

python python-asyncio python-3.5

5
推荐指数
1
解决办法
3087
查看次数

为什么asyncio的run_in_executor阻止龙卷风的get处理程序?

我想在龙卷风的异步GET请求处理程序中运行一个缓慢的阻止方法(实际上是从第三方库中)。将该方法设为:

def blocking_method(uid):
    print("slow method started: ", uid)
    time.sleep(10)
    print("slow method done: ", uid)
    return "slow method ({}) result".format(uid)
Run Code Online (Sandbox Code Playgroud)

此外,我更喜欢在asyncio的事件循环中运行龙卷风服务器:

if __name__ == '__main__':
    tornado.platform.asyncio.AsyncIOMainLoop().install()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(make_app())
    loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

我知道@run_in_executor装饰器,但是它不适合我,因为我使用asyncio。要在异步协程中运行阻止方法,应使用的run_in_executor方法asyncio.get_event_loop()。这是一个示例,如何从此答案中做到这一点

import asyncio

async def main():
    loop = asyncio.get_event_loop()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
    future1 = loop.run_in_executor(executor, blocking_method, 1)
    future2 = loop.run_in_executor(executor, blocking_method, 2)
    response1 = await future1
    response2 = await future2
    print(response1)
    print(response2)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

而且效果很好,这是先前脚本的输出:

slow …
Run Code Online (Sandbox Code Playgroud)

python tornado python-asyncio

0
推荐指数
1
解决办法
1521
查看次数

标签 统计

python ×4

python-asyncio ×3

celery ×1

python-3.5 ×1

redis ×1

tornado ×1