我有一个 celery 任务,它应该在无限循环中运行,监听 RabbitMQ 中的一些队列(与 Celery 内部无关)。当从队列中检索消息时,此长时间运行的任务会分派此消息以供其他任务处理。
如何在 Celery 中适当地实现这样的用例?
我以并发 3 和 Ofair 标志运行 celery。
我目前的观察是,几天后,此设置停止处理来自 celery 内部队列的任务。看来这个长时间运行的任务由于某种原因正在重新启动,最终所有 3 个工作人员都只忙于这个长时间运行的任务,因此没有留下工作人员来处理 celery 队列中的任务。
我考虑过一些基于文件的锁,以确保只有一个工作人员能够获得锁并成为这个长时间运行的任务,但不确定这是否是一个好的选择,我认为这个问题有更好的解决方案。
def init_couriers_consumers(self):
logger.info("lock acquired")
logger.info("TASK ID: {}".format(init_couriers_consumers.request.id))
with Connection('amqp://guest:guest@localhost:5672//') as conn:
couriers_consumer_worker = ConsumerWorker(conn)
couriers_consumer_worker.run()
couriers_consumer_worker.should_stop = False
# cache.set('reboot', False)
self.retry(countdown=2)
class ConsumerWorker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
self._create_queues()
def _create_queues(self):
from courier.models import Courier
self.queues = []
logger.info("create_queues")
for courier in Courier.objects.filter(user__is_active=True):
logger.info("create_queue for courier: {}".format(courier.user.username))
self._create_courier_queues(courier.user.username)
def _create_courier_queues(self, courier_username):
self.queues.append(QueuesFactory.get_consumer_order_status_queue(courier_username))
self.queues.append(QueuesFactory.get_consumer_status_queue(courier_username))
self.queues.append(QueuesFactory.get_consumer_gps_queue(courier_username)) …Run Code Online (Sandbox Code Playgroud) 编辑:我刚刚意识到,当我不打算打印到控制该变量时,它可以工作.为什么?
我遇到了一个与使用utf字符显示字符串标签相关的问题.我在uwsgi ini文件中设置了locale env,如下所示:
env =LC_ALL=en_US.UTF-8
env =LANG=en_US.UTF-8
Run Code Online (Sandbox Code Playgroud)
在wsgi.py中:
locale.setlocale(locale.LC_ALL, 'en_US.UTF-8')
Run Code Online (Sandbox Code Playgroud)
当我运行应用程序代码时:
print (locale.getlocale(), locale.getpreferredencoding())
print locale.getdefaultlocale()
print "option_value", option_value
label = force_text(option_label)
print 'label', label #THIS FAILS
Run Code Online (Sandbox Code Playgroud)
输出是:
(('en_US', 'UTF-8'), 'UTF-8')
('en_US', 'UTF-8')
option_value d
ERROR <stack trace>
print 'label', label
UnicodeEncodeError: 'ascii' codec can't encode character u'\u015b' in position 5: ordinal not in range(128)
Run Code Online (Sandbox Code Playgroud)
当我在生产环境中通过runserver运行app时,问题不存在.Django 1.6.5 Python 2.7.6 Ubuntu 14.04 uWSGI 2.0.5.1