在 Django restframework 中使用 python async/await

pla*_*rsh 6 django asynchronous python-3.x

我只是将一个旧项目升级到 Python 3.6,并发现有这些很酷的新 async / await 关键字。

我的项目包含一个网络爬虫,目前性能不是很好,大约需要 7 分钟才能完成。现在,由于我已经安装了 django restframework 来访问我的 django 应用程序的数据,我认为拥有一个 REST 端点会很好,我可以通过一个简单的 POST 请求从远程启动爬虫。

但是,我不希望客户端同步等待爬虫完成。我只想直接给他发爬虫已经启动的消息,在后台启动爬虫。

from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.conf import settings
from mycrawler import tasks

async def update_all_async(deep_crawl=True, season=settings.CURRENT_SEASON, log_to_db=True):
    await tasks.update_all(deep_crawl, season, log_to_db)


@api_view(['POST', 'GET'])
def start(request):
    """
    Start crawling.
    """
    if request.method == 'POST':
        print("Crawler: start {}".format(request))

        deep = request.data.get('deep', False)
        season = request.data.get('season', settings.CURRENT_SEASON)

        # this should be called async
        update_all_async(season=season, deep_crawl=deep)

        return Response({"Success": {"crawl finished"}}, status=status.HTTP_200_OK)
    else:
        return Response ({"description": "Start the crawler by calling this enpoint via post.", "allowed_parameters": {
            "deep": "boolean",
            "season": "number"
        }}, status.HTTP_200_OK)
Run Code Online (Sandbox Code Playgroud)

我已经阅读了一些教程,以及如何使用循环和其他东西,但我真的不明白......在这种情况下我应该从哪里开始循环?

[编辑] 20/10/2017:

我现在使用线程解决了它,因为它确实是一个“即发即忘”的任务。但是,我仍然想知道如何使用 async/await 实现相同的目标。

这是我目前的解决方案:

import threading


@api_view(['POST', 'GET'])
def start(request):
    ...
    t = threading.Thread(target=tasks.update_all, args=(deep, season))
    t.start()
    ...
Run Code Online (Sandbox Code Playgroud)

cas*_*tel 6

引入异步支持后,这在 Django 3.1+ 中是可能的。

关于异步运行循环,您可以通过运行 Djangouvicorn或任何其他 ASGI 服务器而不是gunicorn其他 WSGI 服务器来使用它。不同之处在于,在使用 ASGI 服务器时,已经有一个运行循环,而在使用 WSGI 时需要创建一个。使用ASGI,您可以简单地定义async直接在views.py其视图类的继承函数下的函数。

假设您使用 ASGI,您有多种方法可以实现这一点,我将描述一些(asyncio.Queue例如可以使用其他选项):

  1. 使start()异步

通过start()async,您可以直接使用现有的运行循环,而通过使用asyncio.Task,您可以触发并忘记现有的运行循环。如果你想开火但记住,你可以创建另一个Task来跟进这个,即:

from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.conf import settings
from mycrawler import tasks

import asyncio

async def update_all_async(deep_crawl=True, season=settings.CURRENT_SEASON, log_to_db=True):
    await tasks.update_all(deep_crawl, season, log_to_db)

async def follow_up_task(task: asyncio.Task):
    await asyncio.sleep(5) # Or any other reasonable number, or a finite loop...
    if task.done():
        print('update_all task completed: {}'.format(task.result()))
    else:
        print('task not completed after 5 seconds, aborting')
        task.cancel()


@api_view(['POST', 'GET'])
async def start(request):
    """
    Start crawling.
    """
    if request.method == 'POST':
        print("Crawler: start {}".format(request))

        deep = request.data.get('deep', False)
        season = request.data.get('season', settings.CURRENT_SEASON)

        # Once the task is created, it will begin running in parallel
        loop = asyncio.get_running_loop()
        task = loop.create_task(update_all_async(season=season, deep_crawl=deep))

        # Fire up a task to track previous down
        loop.create_task(follow_up_task(task))

        return Response({"Success": {"crawl finished"}}, status=status.HTTP_200_OK)
    else:
        return Response ({"description": "Start the crawler by calling this enpoint via post.", "allowed_parameters": {
            "deep": "boolean",
            "season": "number"
        }}, status.HTTP_200_OK)
Run Code Online (Sandbox Code Playgroud)
  1. 异步到同步

有时,您不能一开始就拥有一个async将请求路由到的函数,就像 DRF 一样(截至今天)。为此,Django 提供了一些有用的async适配器函数,但请注意,从同步上下文切换到异步上下文(反之亦然)会带来大约 1 毫秒的小性能损失。请注意,这一次,运行循环update_all_sync改为收集在函数中:

from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.conf import settings
from mycrawler import tasks

import asyncio
from asgiref.sync import async_to_sync

@async_to_sync
async def update_all_async(deep_crawl=True, season=settings.CURRENT_SEASON, log_to_db=True):
    #We can use the running loop here in this use case
    loop = asyncio.get_running_loop()
    task = loop.create_task(tasks.update_all(deep_crawl, season, log_to_db))
    loop.create_task(follow_up_task(task))

async def follow_up_task(task: asyncio.Task):
    await asyncio.sleep(5) # Or any other reasonable number, or a finite loop...
    if task.done():
        print('update_all task completed: {}'.format(task.result()))
    else:
        print('task not completed after 5 seconds, aborting')
        task.cancel()


@api_view(['POST', 'GET'])
def start(request):
    """
    Start crawling.
    """
    if request.method == 'POST':
        print("Crawler: start {}".format(request))

        deep = request.data.get('deep', False)
        season = request.data.get('season', settings.CURRENT_SEASON)

        # Make update all "sync"
        sync_update_all_sync = async_to_sync(update_all_async)
        sync_update_all_sync(season=season, deep_crawl=deep)

        return Response({"Success": {"crawl finished"}}, status=status.HTTP_200_OK)
    else:
        return Response ({"description": "Start the crawler by calling this enpoint via post.", "allowed_parameters": {
            "deep": "boolean",
            "season": "number"
        }}, status.HTTP_200_OK)
Run Code Online (Sandbox Code Playgroud)

在这两种情况下,该函数都会快速返回 200,但从技术上讲,第二个选项更慢。

重要提示:使用 Django 时,这些异步操作中涉及到数据库操作是很常见的。Django 中的 DB 操作只能是同步的,至少现在是这样,因此您必须在异步上下文中考虑这一点。sync_to_async()对于这些情况变得非常方便。


小智 5

在我看来,你应该看看celery,它是一个专门为异步任务设计的很棒的工具。它支持 Django,当您不希望用户在服务器上等待长时间操作时,它非常有用。在后台运行的每个任务都会收到一个task_id,如果您想创建另一个服务,该服务在给定task_id的情况下返回特定任务是否成功,或者到目前为止已经完成了多少,这可以帮助您。

  • 我以前曾使用过芹菜,但对于我的用例来说,它几乎似乎有点矫枉过正:)也许我仍然会尝试一下 (2认同)