我们如何在 Django 中创建异步 API?

hR *_*312 0 python django asynchronous celery django-celery

我想创建一个第三方聊天机器人 API,它是异步的,并在 10 秒暂停后回复“ok”。

import time

def wait():
    time.sleep(10)
    return "ok"

# views.py
def api(request):
    return wait()
Run Code Online (Sandbox Code Playgroud)

我已经尝试了 celery,如下所示,我正在等待 celery 响应:

import time
from celery import shared_task

@shared_task
def wait():
    time.sleep(10)
    return "ok"

# views.py
def api(request):
    a = wait.delay()
    work = AsyncResult(a.id)
    while True:
        if work.ready():
           return work.get(timeout=1)
Run Code Online (Sandbox Code Playgroud)

但是这个解决方案是同步工作的,没有区别。我们如何在不要求用户继续请求直到收到结果的情况下使其异步?

gel*_*ida 6

正如@Blusky 的回答中提到的:异步 API 将存在于 django 3.X 中。不是之前

如果这不是一个选项,那么答案是否定的

还请注意,即使使用 django 3.X,访问数据库的任何 django 代码也不会是异步的,它必须在线程(线程池)中执行

Celery 用于后台任务或延迟任务,但 celery 永远不会返回 HTTP 响应,因为它没有收到它应该响应的 HTTP 请求。Celery 也不是 asyncio 友好的。

您可能不得不考虑更改您的架构/实现。看看你的整体问题,问问自己你是否真的需要一个带有 Django 的异步 API。

这个 API 是用于浏览器应用程序还是机器对机器应用程序?

您的客户可以使用网络套接字并等待答案吗?

您能否将服务器端的阻塞部分和非阻塞部分分开?将 django 用于所有非阻塞、所有周期性/延迟 (django + celery) 并使用 Web 服务器插件或 python ASGI 代码或 Web 套接字实现异步部分。

一些想法

使用 Django + nginx nchan(如果你的 web 服务器是 nginx)

链接到 nchan:https ://nchan.io/ 您的 API 调用将创建一个任务 id,启动一个 celery 任务,立即返回任务 id 或轮询 url。

例如,轮询 URL 将通过 nchan 长轮询通道进行处理。您的客户端连接到与 nchan 长轮询通道相对应的 url,并且 celery 会在您完成任务时将其解除阻塞(10 秒结束)

使用 Django + 一个 ASGI 服务器 + 一个手工编码的视图,并使用类似于 nginx nchan 的策略

同上逻辑,不过你不用nginx nchan,而是自己实现

使用 ASGI 服务器 + 非阻塞框架(或只是一些手工编码的 ASGI 视图)来处理所有阻塞 url,其余使用 Django。

它们可能通过数据库、本地文件或通过本地 http 请求交换数据。

保持阻塞并在您的服务器上抛出足够的工作进程/线程

这可能是最糟糕的建议,但如果它只是供个人使用,并且您知道将有多少并行请求,那么只需确保您有足够的 Django 工作人员,这样您就可以承受阻塞。在这种情况下,您会为每个慢速请求阻塞整个 Django 工作线程。

使用网络套接字。例如使用 Django 的通道模块

Websockets 可以通过 django 通道模块 ( pip install channels) ( https://github.com/django/channels )使用较早版本的 django (>= 2.2 ) 实现

您需要一个 ASGI 服务器来为异步部分提供服务。您可以使用例如 Daphne ot uvicorn(频道文档对此进行了很好的解释)

附录 2020-06-01:调用同步 django 代码的简单异步示例

以下代码使用 starlette 模块,因为它看起来非常简单和小巧

miniasyncio.py

import asyncio
import concurrent.futures
import os
import django
from starlette.applications import Starlette
from starlette.responses import Response
from starlette.routing import Route

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pjt.settings')
django.setup()

from django_app.xxx import synchronous_func1
from django_app.xxx import synchronous_func2

executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)

async def simple_slow(request):
    """ simple function, that sleeps in an async matter """
    await asyncio.sleep(5)
    return Response('hello world')

async def call_slow_dj_funcs(request):
    """ slow django code will be called in a thread pool """
    loop = asyncio.get_running_loop()
    future_func1 = executor.submit(synchronous_func1)
    func1_result = future_func1.result()
    future_func2 = executor.submit(synchronous_func2)
    func2_result = future_func2.result()
    response_txt = "OK"
    return Response(response_txt, media_type="text/plain")

routes = [
    Route("/simple", endpoint=simple_slow),
    Route("/slow_dj_funcs", endpoint=call_slow_dj_funcs),
]

app = Starlette(debug=True, routes=routes)
Run Code Online (Sandbox Code Playgroud)

例如,您可以使用以下代码运行此代码

pip install uvicorn
uvicorn --port 8002 miniasyncio:app
Run Code Online (Sandbox Code Playgroud)

然后在您的 Web 服务器上将这些特定 url 路由到 uvicorn 而不是您的 django 应用程序服务器。