带有元数据的 Celery 任务 - 在哪里存储结果?

jra*_*amm 6 python persistence mongodb celery

我有一个长时间运行的芹菜任务,它会输出一些文件。任务的结果是存储文件的文件夹位置和运行时间。

例如:

@app.task()
def my_task():
   start = time.time()
   folder = my_heavy_function()
   end = time.time()
   return json.dumps({ 'folder': folder, 'time': end-start })
Run Code Online (Sandbox Code Playgroud)

在异步调用任务之前,我将元数据存储在 mongodb 中 - 启动任务的用户、描述等。

例如

def endpoint(req, resp, user, description):

   doc = MyMongoDbDocument(
             user=user
             description=description
         )
   task = my_task.delay()
   doc.task_id = task.id
   doc.save()
   resp.status = HTTP_OK
   resp.body = doc.to_json()
Run Code Online (Sandbox Code Playgroud)

为了在这种情况下保持结果,我可以想到两种可能的方法:

  1. 使用celery后端来存储结果(例如redis)。为了将结果链接到元数据,请将任务 ID 存储在元数据中。

  2. 让任务使用结果数据更新元数据。

这里的最佳实践是什么?我可以看到这两种情况的优点和缺点:

  1. 这意味着客户端必须进行两次调用才能检索与任务相关的所有信息。1 获取元数据和任务 id,另一个获取某个端点,该端点将获取AsyncResult任务 id 并返回结果数据。这也意味着更少的数据局部性。相关数据位于不同的地方。另一方面,这意味着没有数据和代码位置的重复 - 即我没有访问不同位置的(元数据)数据库。

  2. 1次调用即可获取所有数据。但是,这意味着将 mongodb 文档 ID 传递给任务,打开另一个到数据库的连接并更新它。这里可能存在写入冲突的可能性。另外,即使假设所有数据库访问代码都在某个库中被整齐地抽象出来,我认为它并不像 1 那样 DRY。