我的应用程序使用范围会话和SQLALchemy的声明式样式.它是一个Web应用程序,许多数据库插入由Celery任务调度程序执行.
通常,在决定插入对象时,我的代码可能会执行以下操作:
from schema import Session
from schema.models import Bike
pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
new_bike = Bike(pk, "shiny", "bike")
Session.add(new_bike)
Session.commit()
Run Code Online (Sandbox Code Playgroud)
这里的问题是,由于很多,这是通过异步工人完成的,它是可能的一个工作是,虽然中途插入Bike有id=123,而另一个正在检查它的存在.在这种情况下,第二个worker将尝试插入一个具有相同主键的行,SQLAlchemy将引发一个IntegrityError.
我不能为我的生活找到一个很好的方法来处理这个问题,除了交换Session.commit():
'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())
def commit(ignore=False):
try:
Session.commit()
except IntegrityError as e:
reason = e.message
logger.warning(reason)
if not ignore:
raise e
if "Duplicate entry" in reason:
logger.info("%s already in …Run Code Online (Sandbox Code Playgroud) 我存储task_id从celery.result.AsyncResult数据库中,它涉及到的任务影响的项目.这允许我执行查询以检索task_id与特定项目相关的所有任务.
因此,task_id从数据库中检索后,如何检索有关任务状态/结果/等的信息?
我试图找到相当于Java环境的Celery项目,我已经看过Spring Batch,但是对于分布式任务队列有没有更好的选择.
谢谢.
我最近切换到Celery 3.0.在此之前,我使用Flask-Celery将Celery与Flask整合在一起.虽然它有许多问题,比如隐藏了一些强大的Celery功能,但它允许我使用Flask app的完整上下文,尤其是Flask-SQLAlchemy.
在我的后台任务中,我正在处理数据,而SQLAlchemy ORM则用于存储数据.Flask-Celery的维护者已经放弃了对该插件的支持.该插件在任务中挑选了Flask实例,因此我可以完全访问SQLAlchemy.
我试图在我的tasks.py文件中复制此行为,但没有成功.你有任何关于如何实现这一目标的提示吗?
我在我的Django应用程序(在Elastic Beanstalk上)使用Celery和RabbitMQ来管理后台任务,我使用Supervisor对其进行了守护.现在的问题是,我定义的一个期间任务失败了(在一周工作正常之后),我得到的错误是:
[01/Apr/2014 23:04:03] [ERROR] [celery.worker.job:272] Task clean-dead-sessions[1bfb5a0a-7914-4623-8b5b-35fc68443d2e] raised unexpected: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/opt/python/run/venv/lib/python2.7/site-packages/billiard/pool.py", line 1168, in mark_as_worker_lost
human_status(exitcode)),
WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
Run Code Online (Sandbox Code Playgroud)
由主管管理的所有流程都正常运行(supervisorctl statusRUNNNING说).
我尝试在ec2实例上读取几个日志,但似乎没有人帮我找出SIGKILL的原因.我该怎么办?我该如何调查?
这些是我的芹菜设置:
CELERY_TIMEZONE = 'UTC'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
BROKER_URL = os.environ['RABBITMQ_URL']
CELERY_IGNORE_RESULT = True
CELERY_DISABLE_RATE_LIMITS = False
CELERYD_HIJACK_ROOT_LOGGER = False
Run Code Online (Sandbox Code Playgroud)
这是我的supervisord.conf:
[program:celery_worker]
environment=$env_variables
directory=/opt/python/current/app
command=/opt/python/run/venv/bin/celery worker -A com.cygora -l info --pidfile=/opt/python/run/celery_worker.pid
startsecs=10
stopwaitsecs=60
stopasgroup=true
killasgroup=true
autostart=true
autorestart=true
stdout_logfile=/opt/python/log/celery_worker.stdout.log …Run Code Online (Sandbox Code Playgroud) django amazon-ec2 celery supervisord amazon-elastic-beanstalk
我有两种任务:Type1 - 一些高优先级的小任务.Type2 - 优先级较低的繁重任务.
最初我使用默认路由进行简单配置,但没有使用路由密钥.这还不够 - 有时候所有工作人员都在忙于Type2任务,所以Task1被推迟了.我添加了路由键:
CELERY_DEFAULT_QUEUE = "default"
CELERY_QUEUES = {
"default": {
"binding_key": "task.#",
},
"highs": {
"binding_key": "starter.#",
},
}
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
CELERY_ROUTES = {
"search.starter.start": {
"queue": "highs",
"routing_key": "starter.starter",
},
}
Run Code Online (Sandbox Code Playgroud)
所以现在我有2个队列 - 具有高优先级和低优先级的任务.
问题是 - 如何使用不同的并发设置启动2 celeryd?
以前celery用于守护进程模式(根据这个),所以只/etc/init.d/celeryd start 需要启动,但现在我必须运行具有不同队列和并发性的2个不同的芹菜.我该怎么做?
有谁知道你可以在redis中存储的最大值大小是多少?我想将redis用作celery的消息队列来存储一些需要由另一台服务器上的worker处理的小文档,我想确保文档不会太大.
我找到了一个引用1GB的页面,但当我按照页面上的链接获取该答案的链接时,链接无效了.链接在这里:
http://news.ycombinator.com/item?id=1182005
谢谢,肯
我有一个python应用程序,我想在后台开始做更多工作,以便它变得更加繁忙,因为它变得更加繁忙.在过去,我使用Celery来完成正常的后台任务,这一点运作良好.
这个应用程序和我过去做过的其他应用程序之间的唯一区别是我需要保证这些消息得到处理,它们不会丢失.
对于这个应用程序,我不太关心我的消息队列的速度,我首先需要可靠性和耐用性以及formost.为了安全起见,我希望有两个队列服务器,两个都在不同的数据中心,以防出现问题,一个是另一个的备份.
看看Celery看起来它支持一堆不同的后端,有些后端具有更多功能.两个最流行的看起来像redis和RabbitMQ所以我花了一些时间来进一步检查它们.
RabbitMQ: 支持持久队列和群集,但是它们今天进行群集的方式的问题是,如果丢失群集中的节点,则该节点中的所有消息都将不可用,直到您将该节点重新联机为止.它不会复制群集中不同节点之间的消息,只是复制有关消息的元数据,然后返回到原始节点以获取消息,如果节点未运行,则为SOL Not理想.
他们建议解决这个问题的方法是设置第二台服务器并使用DRBD复制文件系统,然后运行pacemaker之类的东西,以便在需要时将客户端切换到备份服务器.这似乎很复杂,不确定是否有更好的方法.谁知道更好的方法?
Redis: 支持一个读取从站,这将允许我在紧急情况下备份,但它不支持主 - 主设置,我不确定它是否处理主站和从站之间的主动故障转移.它没有RabbitMQ那样的功能,但看起来更容易设置和维护.
问题:
设置芹菜的最佳方法是什么,以保证消息处理.
有没有人这样做过?如果是这样,会分享您的所作所为吗?
我们来看一个简单的Django示例.
应用程序/ models.py
from django.db import models
from django.contrib.auth.models import User
class UserProfile(models.Model):
user = models.OneToOneField(User)
token = models.CharField(max_length=32)
Run Code Online (Sandbox Code Playgroud)
应用程序/ views.py
from django.http import HttpResponse
from django.views.decorators.csrf import csrf_exempt
from forms import RegisterForm
from utils.utilities import create_user
@csrf_exempt
def register_view(request):
if request.method == 'POST':
form = RegisterForm(request.POST)
if form.is_valid():
create_user(form.cleaned_data)
return HttpResponse('success')
Run Code Online (Sandbox Code Playgroud)
utils的/ utilities.py
def create_user(data):
user = User.objects.create_user(username=data['username'], email=None, password=data['password'])
user.save()
profile = UserProfile()
profile.user = user
profile.token = generate_token()
profile.save()
Run Code Online (Sandbox Code Playgroud)
在这个例子中,有人可以提供Celery的实现吗?想象一下,这是一个每秒有数百个请求的大型项目.
我对AMQP的理解是消息只有以下组件:
队列附加到交易所.消息不能具有任何队列知识.它们只是发布到交换机,然后根据交换类型和路由密钥,消息被路由到一个或多个队列.
在Celery中,推荐的路由任务方法是通过CELERY_ROUTES设置.来自文档,CELERY_ROUTES是......
路由器列表,或用于将任务路由到队列的单个路由器. http://celery.readthedocs.org/en/latest/configuration.html#message-routing
它包括一个例子......
要将任务路由到feed_tasks队列,可以在
CELERY_ROUTES设置中添加条目 :Run Code Online (Sandbox Code Playgroud)CELERY_ROUTES = { 'feeds.tasks.import_feed': { 'queue': 'feed_tasks', 'routing_key': 'feed.import', }, }
但是等一下 - 根据AMQP,消息只带有路由密钥!什么是"排队"在那里做什么?
此外,还有一个默认队列的概念.如果您调用未被捕获的任务CELERY_ROUTES,则会回退到CELERY_DEFAULT_QUEUE.但是再次 - 在AMQP中,消息不知道队列.这不应该是默认的路由密钥吗?
celery ×10
python ×6
django ×3
redis ×2
amazon-ec2 ×1
amqp ×1
flask ×1
java ×1
mysql ×1
rabbitmq ×1
scalability ×1
spring-batch ×1
sqlalchemy ×1
supervisord ×1