Joh*_*dge 7 python django cron celery django-celery
一些上下文:我正在构建一个Django应用程序,允许用户预先保存一个动作,并安排他们希望所述动作执行的确切日期/时间.例如,安排一个帖子在下周早上5:30以编程方式推送到Facebook墙上.
我正在寻找一个可以处理一次性任务的一千个实例的任务调度系统,所有这些都设置为几乎同时执行(误差范围加上或减去一分钟).
我正在考虑使用Django-celery/Rabbitmq,但我注意到Celery文档没有解决一次性使用的任务.Django-celery在这里是正确的选择(也许是通过继承CrontabSchedule)或者我的能量更好地花在研究其他方法上吗?也许和Sched Module和Cron 一起乱砍.
编辑2:
出于某种原因,我的脑袋原本停留在重复任务的领域.这是一个更简单的解决方案.
您真正需要的是为每个用户操作定义一个任务.您可以跳过存储要在数据库中执行的任务 - 这就是芹菜的用途!
重新使用你的facebook帖子示例,并再次假设你有一个功能在post_to_facebook某个地方,它需要一个用户和一些文本,做一些魔术,并将文本发布到该用户的Facebook,你可以将它定义为这样的任务:
# Task to send one update.
@celery.task(ignore_result=True)
def post_to_facebook(user, text):
# perform magic
return whatever_you_want
Run Code Online (Sandbox Code Playgroud)
当用户准备将这样的帖子排队时,您只需告诉芹菜何时运行该任务:
post_to_facebook.apply_async(
(user, text), # args
eta=datetime.datetime(2012, 9, 15, 11, 45, 4, 126440) # pass execution options as kwargs
)
Run Code Online (Sandbox Code Playgroud)
这里有详细介绍,包括一大堆可用的看涨期权:http://docs.celeryproject.org/en/latest/userguide/calling.html#eta-and-countdown
如果需要调用结果,可以跳过任务定义中的ignore_result参数并返回AsyncResult对象,然后检查调用结果.更多信息:http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results
下面的一些答案仍然有用.您仍然希望为每个用户操作执行任务,您仍然需要考虑任务设计等,但这是一个更简单的方法来完成您的要求.
使用重复任务的原始答案如下:
Dannyroa有正确的想法.我会在这里建立一点.
编辑/ TLDR: 答案是肯定的,芹菜适合您的需要.您可能需要重新考虑您的任务定义.
我假设你不允许你的用户编写任意Python代码来定义他们的任务.除此之外,您必须预定义用户可以安排的某些操作,然后允许他们按照自己的喜好安排这些操作.然后,您可以为每个用户操作运行一个计划任务,检查条目并为每个条目执行操作.
一个用户操作:
使用您的Facebook示例,您可以将用户的更新存储在表格中:
class ScheduledPost(Model):
user = ForeignKey('auth.User')
text = TextField()
time = DateTimeField()
sent = BooleanField(default=False)
Run Code Online (Sandbox Code Playgroud)
然后,您将每分钟运行一个任务,检查计划在最后一分钟发布的表中的条目(基于您提到的错误边距).如果您按下一分钟窗口非常重要,则可以更频繁地安排任务,例如每30秒.任务可能如下所示(在myapp/tasks.py中):
@celery.task
def post_scheduled_updates():
from celery import current_task
scheduled_posts = ScheduledPost.objects.filter(
sent=False,
time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
time__lte=timezone.now()
)
for post in scheduled_posts:
if post_to_facebook(post.text):
post.sent = True
post.save()
Run Code Online (Sandbox Code Playgroud)
配置可能如下所示:
CELERYBEAT_SCHEDULE = {
'fb-every-30-seconds': {
'task': 'tasks.post_scheduled_updates',
'schedule': timedelta(seconds=30),
},
}
Run Code Online (Sandbox Code Playgroud)
其他用户操作:
对于除了发布到Facebook之外的每个用户操作,您还可以定义新表和新任务:
class EmailToMom(Model):
user = ForeignKey('auth.User')
text = TextField()
subject = CharField(max_length=255)
sent = BooleanField(default=False)
time = DateTimeField()
@celery.task
def send_emails_to_mom():
scheduled_emails = EmailToMom.objects.filter(
sent=False,
time__lt=timezone.now()
)
for email in scheduled_emails:
sent = send_mail(
email.subject,
email.text,
email.user.email,
[email.user.mom.email],
)
if sent:
email.sent = True
email.save()
CELERYBEAT_SCHEDULE = {
'fb-every-30-seconds': {
'task': 'tasks.post_scheduled_updates',
'schedule': timedelta(seconds=30),
},
'mom-every-30-seconds': {
'task': 'tasks.send_emails_to_mom',
'schedule': timedelta(seconds=30),
},
}
Run Code Online (Sandbox Code Playgroud)
速度和优化:
为了获得更多的吞吐量,post_scheduled_updates您可以生成一堆子任务并且并行执行(给定足够的工作人员),而不是迭代更新以在调用期间串行发送和发送它们.然后调用post_scheduled_updates非常快速地运行并安排一大堆任务 - 每个fb更新一个 - 以尽快运行.这看起来像这样:
# Task to send one update. This will be called by post_scheduled_updates.
@celery.task
def post_one_update(update_id):
try:
update = ScheduledPost.objects.get(id=update_id)
except ScheduledPost.DoesNotExist:
raise
else:
sent = post_to_facebook(update.text)
if sent:
update.sent = True
update.save()
return sent
@celery.task
def post_scheduled_updates():
from celery import current_task
scheduled_posts = ScheduledPost.objects.filter(
sent=False,
time__gt=current_task.last_run_at, #with the 'sent' flag, you may or may not want this
time__lte=timezone.now()
)
for post in scheduled_posts:
post_one_update.delay(post.id)
Run Code Online (Sandbox Code Playgroud)
我发布的代码未经过测试,当然也未经过优化,但它应该让您走上正轨.在您的问题中,您暗示了对吞吐量的一些担忧,因此您需要仔细查看要优化的位置.一个显而易见的是批量更新而不是迭代调用post.sent=True;post.save().
更多信息:
有关定期任务的更多信息:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html.
关于任务设计策略的一节:http://docs.celeryproject.org/en/latest/userguide/tasks.html#performance-and-strategies
这里有关于优化芹菜的整页:http://docs.celeryproject.org/en/latest/userguide/optimizing.html.
这个关于子任务的页面也可能很有趣:http://docs.celeryproject.org/en/latest/userguide/canvas.html.
事实上,我建议阅读所有芹菜文档.
| 归档时间: |
|
| 查看次数: |
1469 次 |
| 最近记录: |