Heroku上的Django Celery任务导致高内存使用率

Avi*_*eir 9 python django heroku django-celery

我在Heroku上有一个celery任务,它连接到一个外部API并检索一些数据,存储在数据库中并重复几百次.非常快(在~10次循环之后)Heroku开始警告高内存使用率.有任何想法吗?

tasks.py

@app.task
def retrieve_details():
    for p in PObj.objects.filter(some_condition=True):
        p.fetch()
Run Code Online (Sandbox Code Playgroud)

models.py

def fetch(self):
    v_data = self.service.getV(**dict(
        Number=self.v.number
    ))
    response = self.map_response(v_data)

    for key in ["some_key","some_other_key",]:
        setattr(self.v, key, response.get(key))

    self.v.save()
Run Code Online (Sandbox Code Playgroud)

Heroky记录

2017-01-01 10:26:25.634
132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)

Go to the log: https://api.heroku.com/myapps/xxx@heroku.com/addons/logentries

You are receiving this email because your Logentries alarm "Memory quota exceeded"
has been triggered.

In context:
2017-01-01 10:26:25.568 131 <45>1 2017-01-01T10:26:25.457354+00:00 heroku run.5891 - - Process running mem=595M(116.2%)
2017-01-01 10:26:25.634 132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)
Run Code Online (Sandbox Code Playgroud)

rde*_*ges 7

你基本上是将一堆数据加载到内存中的Python字典中.这将导致大量内存开销,尤其是当您从本地数据库中获取大量对象时.

你真的需要将所有这些对象存储在字典中吗?

大多数人为这样的事情做的是:

  • 一次从数据库中检索一个对象.
  • 处理该项目(执行您需要的任何逻辑).
  • 重复.

这样,您最终只能在任何给定时间将单个对象存储在内存中,从而大大减少内存占用.

如果我是你,我会寻找将逻辑转移到数据库查询中的方法,或者只是单独处理每个项目.


2ps*_*2ps 5

为了扩展真正的 rdegges 想法,以下是我过去在使用 celery/python 时使用的两种策略,以帮助减少内存占用:(1) 启动每个只处理一个对象的子任务和/或 (2) 使用发电机。

\n\n
    \n
  1. 启动子任务,每个子任务只处理一个对象:

    \n\n
    @app.task\ndef retrieve_details():\n    qs = PObj.objects.filter(some_condition=True)\n    for p in qs.values_list(\'id\', flat=True):\n        do_fetch.delay(p)\n\n@app.task\ndef do_fetch(n_id):\n    p = PObj.objects.get(id=n_id)\n    p.fetch()\n
    Run Code Online (Sandbox Code Playgroud)\n\n

    现在,您可以调整 celery,使其在处理 N 个 PObj(任务)后杀死进程,以使用--max-tasks-per-child.

  2. \n
  3. 使用生成器:您也可以尝试使用生成器,这样您就可以(理论上)在调用 fetch 后丢弃 PObj

    \n\n
    def ps_of_interest(chunk=10):\n    n = chunk\n    start = 0\n    while n == chunk:\n        some_ps = list(PObj.objects.filter(some_condition=True)[start:start + n])\n        n = len(some_ps)\n        start += chunk\n        for p in some_ps:\n            yield p\n\n@app.task\ndef retrieve_details():\n    for p in ps_of_interest():\n        p.fetch()\n
    Run Code Online (Sandbox Code Playgroud)
  4. \n
\n\n

为了我的钱,我\xe2\x80\x99d选择选项#1。

\n