处理 webhook 时的并发问题

sea*_*son 5 django postgresql concurrency python-3.x

我们的应用程序根据外部服务的网络钩子创建/更新数据库条目。Webhook 发送对象的外部 ID,以便我们可以获取更多数据进行处理。为了获取更多数据而进行往返的 Webhook 的处理时间为 400-1200 毫秒。

有时,同一对象 ID 的多个挂钩在彼此之间的微秒内发送。以下是最近发生的时间戳:

2020-11-21 12:42:45.812317+00:00
2020-11-21 20:03:36.881120+00:00 <-
2020-11-21 20:03:36.881119+00:00 <-
Run Code Online (Sandbox Code Playgroud)

大约在这个时间,还可能发送其他对象进行处理。问题在于,上面突出显示的两个挂钩的并发处理将为同一个对象创建两个新的数据库条目。

问:防止同时处理两个突出显示的条目的最佳方法是什么?

我的尝试: 目前,在传入挂钩开始时,我在存储对象 ID 的 Changes 表中创建一个数据库条目。在处理之前,会检查 Changes 表中是否有在过去 10 秒内为此 ID 创建的条目;如果找到一个进程,它就会退出,让另一个进程完成工作。

在上面的例子中,创建了两个数据库条目,因为它们的时间非常接近,所以它们同时到达检测点,找到对方,然后退出,导致什么也没做。

我想过在检查之前添加一些抖动超时(增加处理时间),或锁定表(再次增加处理时间),但这一切都感觉我正在打一场错误的仗。

有什么建议么?

我们的 API 是带有 Postgres 数据库的 Django 3.1

Rei*_*ica 1

如果您查看acquity webhook 文档,它们会提供一个名为 的字段action,该字段是使您的 webhook幂等的关键。以下是我可以挽救的报价:

操作计划 重新安排 取消 更改 或 order.completed 取决于发起 webhook 调用的操作

不同的动作

  • Scheduled 在最初预约时调用一次
  • 当约会重新安排到新时间时,将调用 rescheduled
  • 每当取消预约时都会调用cancel
  • 当约会以任何方式改变时,changed 会被调用。这包括最初安排、重新安排或取消的时间,以及更新电子邮件地址或入学表格等预约详细信息的时间。
  • order.completed 在订单完成时调用

根据措辞,我假设scheduledcanceled和每个object_idorder.completed都是唯一的,这意味着您可以对这些消息使用唯一的共同约束:

class AcquityAction(models.Model):
    id = models.CharField(max_length=17, primary_key=True)

class AcquityTransaction(models.Model):
    action = models.ForeignKey(AcquityAction, on_delete=models.PROTECT)
    object_id = models.IntegerField()

    class Meta:
        unique_together = [['object_id', 'action_id']]
Run Code Online (Sandbox Code Playgroud)

如果您愿意,您可以用AcquityAction模型代替Enumeration Field,但我更喜欢将它们放在数据库中。

我会完全忽略该事件,因为根据他们模糊的定义,它似乎会在每个change事件上触发。对于该事件,我将创建一个模型,允许您对新日期使用唯一约束,如下所示:rescheduled

class Reschedule(models.Model):
    schedule = models.ForeignKey(MyScheduleModel, on_delete=models.CASCADE)
    schedule_date = models.DateTimeField()

    class Meta:
        unique_together = [['schedule', 'schedule_date']]
Run Code Online (Sandbox Code Playgroud)

或者,您可以有一个专门用于使用重新安排的日期更新计划模型的任务,这样它就保持幂等性

现在在你看来,你会做这样的事情:

from django.db import IntegrityError

ACQUITY_ACTIONS = {'scheduled', 'canceled', 'order.completed'}

def webhook_view(request):
    validate(request)
    action = get_action(request)
    
    if action in ACQUITY_ACTIONS:
        try:
            insert_transaction()
        except IntegrityError:
            return HttpResponse(200)

        webhook_task.delay()

    elif action == 'rescheduled':
        other_webhook_task.delay()

    ...
Run Code Online (Sandbox Code Playgroud)