使用Django REST Framework进行批量插入的最佳设计模式是什么?

Pau*_*l J 7 django celery django-rest-framework

背景

我有一个Django应用程序,允许通过Django REST框架插入记录.

记录将由询问电子表格和其他数据库的客户端应用程序逐行批量插入.REST API允许从Django中抽象出处理数据转换等的其他应用程序.

问题

我想将实际的记录插入与API分离,以提高容错能力和可扩展性.

建议的方法

我正在考虑用Celery做这件事,但我之前没用过它.我正在考虑覆盖perform_create()我现有的DRF ModelViewSets(perform_create()DRF 3.0中添加)来创建工作人员在后台抓取和处理的Celery任务.

DRF文档说perform_create()应该"应该通过调用serializer.save()来保存对象实例".我想知道,在我的情况下,我是否可以忽略此建议,而是让我的Celery任务调用适当的序列化程序来执行对象保存.

例如,我有几个型号:

class Book(models.Model):
    name = models.CharField(max_length=32)

class Author(models.Model):
    surname = models.CharField(max_length=32)
Run Code Online (Sandbox Code Playgroud)

我有这些模型的DRF视图和序列化器:

class BookSerializer(serializers.ModelSerializer):
    class Meta:
        model = Book

class AuthorSerializer(serializers.ModelSerializer):
    class Meta:
        model = Author

class BookViewSet(viewsets.ModelViewSet):
    queryset = Book.objects.all()
    serializer_class = Book

class AuthorViewSet(viewsets.ModelViewSet):
    queryset = Author.objects.all()
    serializer_class = Author
Run Code Online (Sandbox Code Playgroud)

在例如BookViewSet:覆盖perform_create()是一个好主意吗?

def perform_create(self, serializer):
    create_book_task(serializer.data)
Run Code Online (Sandbox Code Playgroud)

在哪里create_book_task分别是这样的:

@shared_task
def create_book_task(data):
    serializer = BookSerializer(data=data)
    serializer.save()
Run Code Online (Sandbox Code Playgroud)

我真的没能找到其他开发人员做类似事情或试图解决同样问题的任何例子.我是不是太复杂了?在物理插入方面,我的数据库仍然是限制因素,但至少它不会阻止API客户端排队数据.如果不合适,我不会对Celery承诺.这是最好的解决方案,它有明显的问题,还是有更好的选择?

Seb*_*ian 2

我发现你的方法是合理的,Celery 很棒,除了一些边界情况,根据我的经验,这些情况可能会有点令人讨厌(但我不希望在你在问题中概述的用例中遇到这种情况)。

但是,请考虑使用 Redis 的简化方法,如下所示。它有一些优点和缺点。

在 BookViewSet 中:

from redis import StrictRedis
from rest_framework import viewsets, renderers

redis_client = StrictRedis()

class BookViewSet(viewsets.ModelViewSet):
    queryset = Book.objects.all()
    serializer_class = Book

    def perform_create(self, serializer):
        json = renderers.JSONRenderer().render(serializer.data)
        redis_client.lpush('create_book_task', json)
Run Code Online (Sandbox Code Playgroud)

在单独的工作脚本中:

from django.utils.six import BytesIO
from redis import StrictRedis
from rest_framework.parsers import JSONParser
from myproject import BookSerializer, Book

MAX_BATCH_SIZE = 1000

def create_book_task():
    bookset = []
    for json in redis_client.brpop(('create_book_task',)):
       stream = BytesIO(json)
       data = JSONParser().parse(stream)
       serializer = BookSerializer(data=data)
       assert serializer.is_valid()
       bookset.append(serializer.instance)
       if len(bookset) >= MAX_BATCH_SIZE:
           break

    if len(bookset) > 0:
        Book.objects.bulk_create(bookset)

while True:
    create_book_task()
Run Code Online (Sandbox Code Playgroud)

优点

  • 您不需要添加 Celery(再次强调,喜欢它,但它使测试变得有点棘手,并且有时会根据工作负载、配置等而变得有点棘手)
  • 它处理批量创建,因此如果您在短时间内(几秒或不到一秒)提交了数千本书,则只会在数据库上执行少量插入(而不是数千次插入)

缺点

  • 你自己负责低级序列化,而不是 Celery “神奇地”做它
  • 您需要自己管理工作脚本(将其守护进程,可能将其打包为管理命令,负责重新启动等),而不是将其交给 Celery

当然,上述是第一种方法,您可能希望使其更通用以重用其他模型,将 MAX_BATCH_SIZE 移至您的设置,使用 pickling 而不是 JSON 或根据您的具体情况进行各种其他调整、改进或设计决策需要。

最后,我可能会采用我的答案中概述的方法,除非您预计还有其他几个任务将被卸载到异步处理,其中使用 Celery 的情况会变得更加充分。

PS:由于实际插入将异步完成,请考虑使用202 Accepted 响应代码进行响应(除非201 Created这会搞砸您的客户端)。