使用异步服务器的长时间运行任务

dmi*_*try 5 python tornado websocket python-asyncio aiohttp

我想每个人都知道如何处理django中的长期任务:使用芹菜和放松.但是,如果我想通过aiohttp(或龙卷风)获得websockets的好处呢?

假设我有很多CPU绑定任务,可能需要几秒钟到多个(5-10)分钟.在websocket循环中处理此任务并通知用户进度似乎是个好主意.没有ajax请求,对短任务的响应速度非常快.

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.tp == aiohttp.MsgType.text:     
            answer_to_the_ultimate_question_of_life_the_universe_and_everything =\
                long_running_task(msg.data, NotificationHelper(ws))
            ws.send_str(json.dumps({
                'action': 'got-answer',
                'data': answer_to_the_ultimate_question_of_life_the_universe_and_everything,
            }))
    return ws
Run Code Online (Sandbox Code Playgroud)

但另一方面,按照我的理解,以这种方式提供服务的CPU绑定任务会阻塞整个线程.如果我有10个工作人员和11个想要使用应用程序的客户端,则在第一个客户端的任务完成之前,将不会提供第11个客户端.

也许,我应该运行在芹菜中看起来很大的任务和在主循环中看起来很小的任务?

所以,我的问题是:是否有任何好的设计模式用于为异步服务器提供长期运行的任务?

谢谢!

And*_*lov 8

只需运行长时间运行的CPU绑定任务,loop.run_in_executor()然后发送进度通知loop.call_soon_threadsafe().

如果您的工作不是CPU而是IO绑定(例如发送电子邮件),您可以通过loop.create_task()调用创建新任务.它看起来像产生新线程.

如果您无法使用发射后不管你需要使用持久性消息代理像RabbitMQ的方法(有https://github.com/benjamin-hodgson/asynqp库在ASYNCIO方式与兔通信).