在*Large*Django QuerySet中限制内存使用

Chr*_* W. 22 python django memory-management django-queryset

我有一个任务需要每隔一段时间(每天一次,每周一次,无论如何)在我的数据库中的"大多数"对象上运行一次.基本上这意味着我有一些看起来像这样运行在它自己的线程中的查询.

for model_instance in SomeModel.objects.all():
    do_something(model_instance)
Run Code Online (Sandbox Code Playgroud)

(请注意,它实际上是一个filter()而不是所有()但是我仍然最终选择了一组对象.)

我遇到的问题是,运行一段时间之后线程被我的托管服务提供商杀死,因为我使用了太多内存.我假设所有这些内存使用正在发生,因为即使QuerySet我的查询返回的对象最初具有非常小的内存占用量,它最终会随着QuerySet对象model_instance在迭代它们时缓存每个内容而增长.

我的问题是," SomeModel以内存有效的方式迭代几乎每个数据库中的最佳方法是什么?" 或者我的问题是"如何从django查询集中取消缓存'模型实例?"

编辑:我实际上使用查询集的结果来构建一系列新对象.因此,我最终不会更新查询的对象.

Chr*_* W. 17

所以我实际上最终做的是构建一些可以"封装"QuerySet的东西.它通过使用切片语法对QuerySet进行some_queryset[15:45]深度复制来实现- 例如, - 然后它对原始QuerySet进行另一次深度复制切片完全迭代后.这意味着只有'this'特定切片中返回的对象集才存储在内存中.

class MemorySavingQuerysetIterator(object):

    def __init__(self,queryset,max_obj_num=1000):
        self._base_queryset = queryset
        self._generator = self._setup()
        self.max_obj_num = max_obj_num

    def _setup(self):
        for i in xrange(0,self._base_queryset.count(),self.max_obj_num):
            # By making a copy of of the queryset and using that to actually access
            # the objects we ensure that there are only `max_obj_num` objects in
            # memory at any given time
            smaller_queryset = copy.deepcopy(self._base_queryset)[i:i+self.max_obj_num]
            logger.debug('Grabbing next %s objects from DB' % self.max_obj_num)
            for obj in smaller_queryset.iterator():
                yield obj

    def __iter__(self):
        return self

    def next(self):
        return self._generator.next()
Run Code Online (Sandbox Code Playgroud)

而不是......

for obj in SomeObject.objects.filter(foo='bar'): <-- Something that returns *a lot* of Objects
    do_something(obj);
Run Code Online (Sandbox Code Playgroud)

你会做...

for obj in MemorySavingQuerysetIterator(in SomeObject.objects.filter(foo='bar')):
    do_something(obj);
Run Code Online (Sandbox Code Playgroud)

请注意,这样做的目的是节省Python解释中的内存.它主要通过进行更多数据库查询来实现.通常人们试图与此完全相反 - 即尽可能地减少数据库查询而不考虑内存使用情况.希望有人会觉得这很有用.


mpa*_*paf 13

如何使用django core的Paginator和Page对象记录在这里:

https://docs.djangoproject.com/en/dev/topics/pagination/

像这样的东西:

from django.core.paginator import Paginator
from djangoapp.models import SomeModel

paginator = Paginator(SomeModel.objects.all(), 1000) # chunks of 1000

for page_idx in range(1, paginator.num_pages):
    for row in paginator.page(page_idx).object_list:
        # here you can do what you want with the row
    print "done processing page %s" % page_idx
Run Code Online (Sandbox Code Playgroud)

  • 应该是`for page_idx in range(1, paginator.num_pages+1):` 否则你会跳过最后一页 (4认同)
  • 为什么这不是公认的答案?因为它是本机django解决方案,而且看起来是最少的工作即可完成工作 (2认同)
  • 出于好奇,这不是类似于 `iterator()` 吗?https://docs.djangoproject.com/en/2.1/ref/models/querysets/#iterator 实际上,Paginator 会先调用 `count`(否则为 len)。它不会稍微降低效率吗?为什么这个选项比使用迭代器更好? (2认同)

小智 12

您不能简单地使用Model.objects.all().iterator(),因为它将立即获取您表上的所有元素.你不能简单地使用Model.objects.all()[offset:offset + pagesize]方式,因为它会捕获你的结果.任何这些都将超出你的记忆限制.

我试过混合两种解决方案,它起作用了:

offset = 0
pagesize = 1000
count = Model.objects.all().count()
while offset < count:
    for m in Model.objects.all()[offset : offset + pagesize].iterator:
        do_something with m
    offset += pagesize
Run Code Online (Sandbox Code Playgroud)

更改pagesize以满足您的要求,并且如果它更适合您,可以将[offset:offset + pagesize]更改为[offset*pagesize:(offset + 1)*pagesize]惯用法.当然,也可以用实际的型号名称替换Model.

  • QuerySet.iterator() 将使用偏移量逐一获取行而不缓存结果,具体取决于数据库。文档中特别指出 Postgres 和 Oracle 可以做到这一点。 (3认同)
  • "抓住你的结果"是什么意思? (2认同)
  • 缓存我假设 (2认同)
  • 我不知道为什么父答案谴责迭代器的使用,因为(正如 @Derek 所指出的) `.iterator(batch_size=N)` 似乎完全符合问题的要求......至少在 Postgres 的服务器端光标?例如,我有一个查询,它可以返回大约 1000 行(模型实例),其中每行包含大约 20MB 的 JSON 数据。如果没有 .iterator(),峰值内存使用量将远远超过 20GB。使用 .iterator(batch_size=N) 完全解决了这个问题,该迭代器将内存与数据库往返进行交换。 (2认同)

drs*_*drs 8

许多解决方案实现sql OFFSETLIMIT通过切片查询集.正如斯特凡诺指出的那样,对于更大的数据集,这变得非常低效.处理此问题的正确方法是使用服务器端光标来跟踪OFFSET.

本机服务器端游标支持正在为django工作.在它准备就绪之前,如果您使用带有psycopg2后端的postgres,这是一个简单的实现:

def server_cursor_query(Table):
    table_name = Table._meta.db_table

    # There must be an existing connection before creating a server-side cursor
    if connection.connection is None:
        dummy_cursor = connection.cursor()  # not a server-side cursor

    # Optionally keep track of the columns so that we can return a QuerySet. However,
    # if your table has foreign keys, you may need to rename them appropriately
    columns = [x.name for x in Table._meta.local_fields]

    cursor = connection.connection.cursor(name='gigantic_cursor')) # a server-side
                                                                   # cursor

    with transaction.atomic():
        cursor.execute('SELECT {} FROM {} WHERE id={}'.format(
            ', '.join(columns), table_name, id))

        while True:
            rows = cursor.fetchmany(1000)

                if not rows:
                    break

                for row in rows:
                    fields = dict(zip(columns, row))
                    yield Table(**fields)
Run Code Online (Sandbox Code Playgroud)

有关django中大型查询的内存问题的详细解释,请参阅此博客文章.


Zag*_*ags 8

简答

如果您使用 PostgreSQL 或 Oracle,则可以使用 Django 的内置迭代器

queryset.iterator(chunk_size=1000)
Run Code Online (Sandbox Code Playgroud)

这会导致 Django在迭代查询集时使用服务器端游标而不是缓存模型。从 Django 4.1 开始,这甚至可以与prefetch_related.

对于其他数据库,您可以使用以下内容:

def queryset_iterator(queryset, page_size=1000):
    page = queryset.order_by("pk")[:page_size]
    while page:
        for obj in page:
            yield obj
            pk = obj.pk
        page = queryset.filter(pk__gt=pk).order_by("pk")[:page_size]
Run Code Online (Sandbox Code Playgroud)

如果您想要返回页面而不是单个对象以与其他优化(例如 )结合bulk_update使用,请使用以下命令:

def queryset_to_pages(queryset, page_size=1000):
    page = queryset.order_by("pk")[:page_size]
    while page:
        yield page
        pk = max(obj.pk for obj in page)
        page = queryset.filter(pk__gt=pk).order_by("pk")[:page_size]
Run Code Online (Sandbox Code Playgroud)

PostgreSQL 性能分析

我在 Django 3.2 和 Postgres 13 上对大约 200,000 行的 PostgreSQL 表分析了多种不同的方法。对于每个查询,我将 ids 的总和相加,既确保 Django 实际检索对象,也使我能够验证查询之间迭代的正确性。所有计时都是在对相关表进行多次迭代后进行的,以最大限度地减少后续测试的缓存优势。

基本迭代

基本方法只是迭代表。这种方法的主要问题是所使用的内存量不是恒定的;它随着表的大小而增长,并且我已经看到在较大的表上内存不足。

x = sum(i.id for i in MyModel.objects.all())
Run Code Online (Sandbox Code Playgroud)

挂壁时间:3.53 秒,22MB 内存(BAD)

Django迭代器

Django 迭代器(至少从 Django 3.2 开始)修复了内存问题,并带来了较小的性能提升。据推测,这是由于 Django 管理缓存的时间减少了。

assert sum(i.id for i in MyModel.objects.all().iterator(chunk_size=1000)) == x
Run Code Online (Sandbox Code Playgroud)

挂载时间:3.11 秒,<1MB 内存

自定义迭代器

自然的比较点是尝试通过逐渐增加对主键的查询来自己进行分页。虽然这是对简单迭代的改进,因为它具有恒定的内存,但它实际上在速度上输给了 Django 的内置迭代器,因为它进行了更多的数据库查询。

def queryset_iterator(queryset, page_size=1000):
    page = queryset.order_by("pk")[:page_size]
    while page:
        for obj in page:
            yield obj
            pk = obj.pk
        page = queryset.filter(pk__gt=pk).order_by("pk")[:page_size]

assert sum(i.id for i in queryset_iterator(MyModel.objects.all())) == x
Run Code Online (Sandbox Code Playgroud)

挂载时间:3.65 秒,<1MB 内存

自定义分页功能

使用自定义迭代的主要原因是您可以在页面中获取结果。此函数对于在仅使用常量内存时插入批量更新非常有用。在我的测试中,它比 queryset_iterator 慢一点,而且我没有一个连贯的理论来解释为什么,但速度减慢并不严重。

def queryset_to_pages(queryset, page_size=1000):
    page = queryset.order_by("pk")[:page_size]
    while page:
        yield page
        pk = max(obj.pk for obj in page)
        page = queryset.filter(pk__gt=pk).order_by("pk")[:page_size]

assert sum(i.id for page in queryset_to_pages(MyModel.objects.all()) for i in page) == x
Run Code Online (Sandbox Code Playgroud)

挂载时间:4.49 秒,<1MB 内存

替代自定义分页功能

鉴于 Django 的查询集迭代器比我们自己进行分页更快,因此可以交替实现查询集分页器来使用它。它比我们自己进行分页要快一点,但实现起来比较混乱。可读性很重要,这就是为什么我个人更喜欢前一个分页功能,但如果您的查询集在结果中没有主键(无论出于何种原因),这个功能可能会更好。

def queryset_to_pages2(queryset, page_size=1000):
    page = []
    page_count = 0
    for obj in queryset.iterator():
        page.append(obj)
        page_count += 1
        if page_count == page_size:
            yield page
            page = []
            page_count = 0
    yield page

assert sum(i.id for page in queryset_to_pages2(MyModel.objects.all()) for i in page) == x
Run Code Online (Sandbox Code Playgroud)

挂载时间:4.33 秒,<1MB 内存


不良方法

以下是您永远不应该使用的方法(问题中建议了其中许多方法)以及原因。

不要对无序查询集使用切片

无论你做什么,都不要对无序查询集进行切片。这不能正确地迭代表。 原因是切片操作根据您的查询集执行 SQL 限制 + 偏移量查询,并且 django 查询集没有顺序保证,除非您使用order_by. 此外,PostgreSQL 没有默认的 order by,并且Postgres 文档特别警告不要在没有 order by 的情况下使用 limit + offset。因此,每次获取切片时,您都会获得表的不确定性切片,这意味着您的切片可能不会重叠,并且不会覆盖它们之间表的所有行。根据我的经验,只有当您在进行迭代时有其他东西正在修改表中的数据时,才会发生这种情况,这只会使这个问题更加严重,因为这意味着如果您单独测试代码,则该错误可能不会出现。

def very_bad_iterator(queryset, page_size=1000):
    counter = 0
    count = queryset.count()
    while counter < count:     
        for model in queryset[counter:counter+page_size].iterator():
            yield model
        counter += page_size

assert sum(i.id for i in very_bad_iterator(MyModel.objects.all())) == x
Run Code Online (Sandbox Code Playgroud)

断言错误;即计算结果不正确!

一般情况下不要使用切片进行全表迭代

即使我们对查询集进行排序,从性能角度来看,列表切片也是很糟糕的。这是因为 SQL offset 是线性时间操作,这意味着表的 limit + offset 分页迭代将是二次时间,这是您绝对不希望的。

def bad_iterator(queryset, page_size=1000):
    counter = 0
    count = queryset.count()
    while counter < count:     
        for model in queryset.order_by("id")[counter:counter+page_size].iterator():
            yield model
        counter += page_size

assert sum(i.id for i in bad_iterator(MyModel.objects.all())) == x
Run Code Online (Sandbox Code Playgroud)

挂载时间:15 秒(BAD),<1MB 内存

不要使用 Django 的分页器进行全表迭代

Django 带有一个内置的分页器。人们可能会认为这适合对数据库进行分页迭代,但事实并非如此。Paginator 的目的是将单页结果返回到 UI 或 API 端点。它比任何迭代表的好方法都要慢得多。

from django.core.paginator import Paginator

def bad_paged_iterator(queryset, page_size=1000):
    p = Paginator(queryset.order_by("pk"), page_size)
    for i in p.page_range:
        yield p.get_page(i)
        
assert sum(i.id for page in bad_paged_iterator(MyModel.objects.all()) for i in page) == x
Run Code Online (Sandbox Code Playgroud)

挂载时间:13.1 秒(BAD),<1MB 内存


小智 5

有一个 django 片段:

http://djangosnippets.org/snippets/1949/

它通过生成原始查询集的较小“块”的行来迭代查询集。它最终使用的内存显着减少,同时允许您调整速度。我在我的一个项目中使用它。