Django批处理/批量更新_or_create?

bin*_*ker 15 python database django orm

我在数据库中有数据需要进行peridocially更新.数据源返回在该时间点可用的所有内容,因此将包含数据库中尚未存在的新数据.

当我遍历源数据时,我不希望在可能的情况下进行1000次单独写入.

有什么比如update_or_create批量生产吗?

一种想法是update_or_create与手动事务结合使用,但我不确定这是否只是将单个写入排队或是否将它们全部合并到一个SQL插入中?

或者类似地可以@commit_on_success()在一个函数update_or_create内部使用循环工作?

除了翻译数据并将其保存到模型之外,我没有对数据做任何事情.没有什么依赖于循环期间存在的模型

Rei*_*ica 44

从 Django 4.1 开始,该bulk_create方法支持通过更新插入update_conflicts,这是单个查询,批处理相当于update_or_create

class Foo(models.Model):
    a = models.IntegerField(unique=True)
    b = models.IntegerField()

objects = [Foo(1, 1), Foo(1, 2)]

Foo.objects.bulk_create(
    objects, 
    update_conflicts=True,
    unique_fields=['a'],
    update_fields=['b'],
)
Run Code Online (Sandbox Code Playgroud)


Zag*_*ags 12

由于 Django 添加了对bulk_update 的支持,现在这在某种程度上是可能的,尽管每批需要执行 3 次数据库调用(一次获取、一次批量创建和一次批量更新)。在这里为通用函数建立一个良好的接口有点具有挑战性,因为您希望该函数既支持高效的查询又支持更新。这是我实现的一种方法,该方法专为批量 update_or_create 而设计,其中您有许多通用标识键(可能为空)和一个在批次之间有所不同的标识键。

这是作为基本模型上的方法实现的,但可以独立使用。这还假设基本模型auto_now在名为 的模型上有一个时间戳updated_on;如果不是这种情况,则假定这种情况的代码行已被注释以便于修改。

为了批量使用它,请在调用它之前将更新分块。这也是一种绕过可以具有少量辅助标识符值之一的数据的方法,而无需更改接口。

class BaseModel(models.Model):
    updated_on = models.DateTimeField(auto_now=True)
    
    @classmethod
    def bulk_update_or_create(cls, common_keys, unique_key_name, unique_key_to_defaults):
        """
        common_keys: {field_name: field_value}
        unique_key_name: field_name
        unique_key_to_defaults: {field_value: {field_name: field_value}}
        
        ex. Event.bulk_update_or_create(
            {"organization": organization}, "external_id", {1234: {"started": True}}
        )
        """
        with transaction.atomic():
            filter_kwargs = dict(common_keys)
            filter_kwargs[f"{unique_key_name}__in"] = unique_key_to_defaults.keys()
            existing_objs = {
                getattr(obj, unique_key_name): obj
                for obj in cls.objects.filter(**filter_kwargs).select_for_update()
            }
            
            create_data = {
                k: v for k, v in unique_key_to_defaults.items() if k not in existing_objs
            }
            for unique_key_value, obj in create_data.items():
                obj[unique_key_name] = unique_key_value
                obj.update(common_keys)
            creates = [cls(**obj_data) for obj_data in create_data.values()]
            if creates:
                cls.objects.bulk_create(creates)

            # This set should contain the name of the `auto_now` field of the model
            update_fields = {"updated_on"}
            updates = []
            for key, obj in existing_objs.items():
                obj.update(unique_key_to_defaults[key], save=False)
                update_fields.update(unique_key_to_defaults[key].keys())
                updates.append(obj)
            if existing_objs:
                cls.objects.bulk_update(updates, update_fields)
        return len(creates), len(updates)

    def update(self, update_dict=None, save=True, **kwargs):
        """ Helper method to update objects """
        if not update_dict:
            update_dict = kwargs
        # This set should contain the name of the `auto_now` field of the model
        update_fields = {"updated_on"}
        for k, v in update_dict.items():
            setattr(self, k, v)
            update_fields.add(k)
        if save:
            self.save(update_fields=update_fields)
Run Code Online (Sandbox Code Playgroud)

用法示例:

class Event(BaseModel):
    organization = models.ForeignKey(Organization)
    external_id = models.IntegerField(unique=True)
    started = models.BooleanField()


organization = Organization.objects.get(...)
updates_by_external_id = {
    1234: {"started": True},
    2345: {"started": True},
    3456: {"started": False},
}
Event.bulk_update_or_create(
    {"organization": organization}, "external_id", updates_by_external_id
)
Run Code Online (Sandbox Code Playgroud)

可能的竞争条件

上面的代码利用事务和选择更新来防止更新时的竞争条件。然而,如果两个线程或进程试图创建具有相同标识符的对象,则插入时可能存在竞争条件。

简单的缓解方法是确保 common_keys 和 unique_key 的组合是数据库强制的唯一性约束(这是此函数的预期用途)。这可以通过使用 unique_key 引用字段来实现unique=True,或者将 unique_key 与由 UniqueConstraint 强制执行为唯一的 common_key 子集相结合。通过数据库强制的唯一性保护,如果多个线程尝试执行冲突的创建,则除了一个线程之外的所有线程都将失败并返回IntegrityError. 由于封闭事务,失败的线程不会执行任何更改,并且可以安全地重试或忽略(失败的冲突创建只能被视为首先发生然后立即被覆盖的创建)。

如果无法利用唯一性约束,那么您将需要实现自己的并发控制或锁定整个表


Pau*_*aul 3

批量更新将是一个 upsert 命令,就像 @imposeren 所说,Postgres 9.5 为您提供了这种能力。我认为 Mysql 5.7 也可以(请参阅http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html),具体取决于您的具体需求。也就是说,使用数据库游标可能是最简单的。这没有什么问题,当 ORM 还不够的时候就可以使用它。

沿着这些思路的东西应该有效。这是伪代码,所以不要只是剪切粘贴它,但这个概念已经为你准备好了。

class GroupByChunk(object):
    def __init__(self, size):
        self.count = 0
        self.size = size
        self.toggle = False

    def __call__(self, *args, **kwargs):
        if self.count >= self.size:  # Allows for size 0
            self.toggle = not self.toggle
            self.count = 0
        self.count += 1
        return self.toggle

def batch_update(db_results, upsert_sql):
    with transaction.atomic():
        cursor = connection.cursor()   
        for chunk in itertools.groupby(db_results, GroupByChunk(size=1000)):
            cursor.execute_many(upsert_sql, chunk)
Run Code Online (Sandbox Code Playgroud)

这里的假设是:

  • db_results是某种结果迭代器,位于列表或字典中
  • 结果db_results可以直接输入到原始 sql exec 语句中
  • 如果任何批量更新失败,您将回滚所有更新。如果您想将其移动到每个块,只需将with块向下推一点即可