我们正在使用 Django Celery 进行后台数据处理,获取 CSV 文件(最大 15MB),将其转换为 dict 数据列表(其中还包括一些 Django 模型对象),并将其分解为块以在子任务中处理:
@task
def main_task(data):
i = 0
for chunk in chunk_up(data):
chunk_id = "chunk_id_{}".format(i)
cache.set(chunk_id, chunk, timeout=FIVE_HOURS)
sub_task.delay(chunk_id)
i += 1
@task
def sub_task(chunk_id):
data_chunk = cache.get(chunk_id)
... # do processing
Run Code Online (Sandbox Code Playgroud)
所有任务都在由 Celery 管理的后台并发进程中运行。我们最初使用Redis后端,但发现它在峰值负载场景和高并发情况下经常会出现内存不足的情况。所以我们切换到Django 的基于文件的缓存后端。虽然这解决了内存问题,但我们发现 20-30% 的缓存条目从未被写入。没有抛出错误,只是无声的失败。当我们返回并从 CLI 查找缓存时,我们看到例如 chunk_id_7 和 chunk_id_9 会存在,但 chunk_id_8 不会存在。因此,间歇性地,某些缓存条目无法保存。
我们交换了磁盘缓存后端并观察到同样的情况,尽管缓存故障似乎减少到 5-10%(非常粗略的估计)。
我们注意到过去Django 基于文件的缓存存在并发进程问题,但它似乎在很多年前就已得到修复(我们使用的是 v1.11)。一条评论说这个缓存后端更像是一个 POC,尽管再次不确定它从那时起是否发生了变化。
基于文件的缓存是生产质量的缓存解决方案吗?如果是,什么可能导致我们的写入失败?如果没有,对于我们的用例来说,什么是更好的解决方案?
我有一个字符串:
"foo hello world baz 33"
Run Code Online (Sandbox Code Playgroud)
foo和之间的部分baz将是一些空格分隔的单词(一个或多个).我希望将此字符串与将分组每个单词的re匹配:
>>> re.match(r'foo (<some re here>) baz (\d+)', "foo hello world baz 33").groups()
('hello', 'world', '33')
Run Code Online (Sandbox Code Playgroud)
re应该是灵活的,以便它可以在周围没有单词的情况下工作:
>>> re.match(r'(<some re here>)', "hello world").groups()
('hello', 'world')
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用变体([\w+\s])+,但我无法捕获动态确定数量的组.这可能吗?
我是第一次将远程工作人员连接到我的Celery服务器(Django).在我的服务器上,我为用户创建了一个新的用户名和密码,并设置了权限:
# rabbitmqctl add_user adcelery pwd
# rabbitmqctl set_permissions adcelery "^adcelery-.*" ".*" ".*"
# rabbitmqctl list_users
Listing users ...
guest [administrator]
adcelery []
...done.
# /etc/init.d/rabbitmq-server restart
# /etc/init.d/celeryd restart
Run Code Online (Sandbox Code Playgroud)
我的远程工作者的URL:
BROKER_URL = "amqp://adcelery:pwd@mydomain.com/"
Run Code Online (Sandbox Code Playgroud)
我在远程工作者的启动时收到以下错误.当我在BROKER_URL上面设置"guest:guest"作为我的登录时,它连接完全正常.我确定我错过了一两步,有什么建议吗?
[2014-01-12 11:31:26,188: INFO/MainProcess] Connected to amqp://adcelery@awaaz.de:5672//
[2014-01-12 11:31:26,391: ERROR/MainProcess] Unrecoverable error: AccessRefused(403, u"ACCESS_REFUSED - access to exchange 'celeryev' in vhost '/' refused f
or user 'adcelery'", (40, 10), 'Exchange.declare')
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/worker/__init__.py", line 206, in start
self.blueprint.start(self)
File "/usr/local/lib/python2.7/dist-packages/celery/bootsteps.py", …Run Code Online (Sandbox Code Playgroud) 将芹菜文档描述了如何通过位置参数,以您的节拍计划任务列表或元组.
我有一个任务,只需一个参数,一个整数列表:
@shared_task
def schedule_by_ids(ids):
...
Run Code Online (Sandbox Code Playgroud)
我的celerybeat时间表如下:
CELERYBEAT_SCHEDULE = {
'schedule_by_ids': {
'task': 'myproj.app.tasks.schedule_by_ids',
'schedule': crontab(minute='*/10', hour='8-21'),
'args': ([1,]),
},
}
Run Code Online (Sandbox Code Playgroud)
我的任务失败,"int不可迭代" TypeError.根据我的显示器(芹菜花),args传递为[1].
当我将args作为列表时,例如[[1]],arg显示在监视器中,[[1]]并且它工作正常.
我的问题是:当它是一个元组时,它是如何通过args的?为什么?
我有一个模特:
class Call(models.Model):
created_on = models.DateTimeField(auto_now_add=True)
send_on = models.DateTimeField(default=datetime.now)
...
survey = models.ForeignKey(Survey, null=True, blank=True)
Run Code Online (Sandbox Code Playgroud)
和相应的ModelSerializer:
class CallSerializer(serializers.HyperlinkedModelSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='call-detail',
)
recipient = PhoneNumberField(read_only=False)
status = SurveySerializer(source='survey', read_only=True)
class Meta:
model = Call
fields = ('url', 'id', 'send_on', ...)
lookup_field= 'pk'
Run Code Online (Sandbox Code Playgroud)
并查看:
class CallList(generics.ListCreateAPIView):
serializer_class = CallSerializer
permission_classes = (permissions.IsAuthenticatedOrReadOnly, IsOwnerOrSuperuser,)
def pre_save(self, obj):
...
def get_queryset(self):
"""
This view should return a list of all the calls
for the currently authenticated user.
"""
...
Run Code Online (Sandbox Code Playgroud)
我希望能够支持API列表请求,让调用者Call按日期范围过滤 …