我正在寻找一些建议,以便将从任务生成的列表映射到芹菜中的另一个任务的最佳方法.
假设我有一个名为的任务parse,它解析PDF文档并输出一个页面列表.然后需要将每个页面单独传递给另一个名为的任务feed.这一切都需要进入一个叫做的任务process
所以,我能做到的一种方法是:
@celery.task
def process:
pages = parse.s(path_to_pdf).get()
feed.map(pages)
Run Code Online (Sandbox Code Playgroud)
当然,这不是一个好主意,因为我get()在一个任务中调用.
此外,这是低效的,因为我的parse任务包含在生成器函数中并且能够生成页面,这意味着应该可以在解析器生成最后一页之前对第一页进行排队.
另一种可能性是这样做:
@celery.task
def process:
for page in parse.s(path_to_pdf).get():
feed.delay(page)
Run Code Online (Sandbox Code Playgroud)
这个例子仍然涉及调用get()任务内部.此外,这个例子过于简单化,我真的需要在所有页面都被输入之后做一些事情(即在a中chord).
我正在寻找在芹菜中做到这一点的最佳方式.我很感激任何建议.
谢谢!
我想在正在运行的任务中获取任务ID,而不知道我在哪个任务.(这就是为什么我不能使用/sf/answers/566726051/)
我希望它是这样的:
@task
def my_task():
foo()
def foo():
logger.log(current_task_id)
Run Code Online (Sandbox Code Playgroud)
这种模式在许多不同的任务中返回,我不想将任务上下文带到每个内部方法调用.
一个选项可能是使用线程本地存储,但是我需要在任务启动之前初始化它,并在完成后清理它.
有什么比这更简单的了吗?
我现在正在尝试让 Celery 工作一段时间。当我同步测试时,我所有的 crontabs 工作得很好
sudo celery -A testdjango worker --loglevel=DEBUG --beat
Run Code Online (Sandbox Code Playgroud)
但是当我这样做的时候
celery multi start -A testdjango w1 -l info
Run Code Online (Sandbox Code Playgroud)
我的 crontab 都没有工作。我不知道为什么
注意:我也尝试了其他计划时间间隔,例如 withtime delta同样的事情也会发生。
所以我相当确定这不是 crontab 的事情,而是与我开始使用 celery multi 的方式有关。
此外,worker 启动得很好,因为我可以在Celery Flower 中看到它,但没有执行任何任务。
我正在尝试使用 Django 将 python 脚本作为芹菜任务运行。我遇到的问题是,一旦脚本开始运行,任务就认为它已完成。我最初在 tasks.py 文件中使用 subprocess.popen(),但意识到这意味着一旦发出 popen() 命令,任务就会完成。我修改了我的 tasks.py 代码以调用运行脚本的 python 脚本中的函数;但是,这仍然会像任务立即完成一样执行。我很困惑,因为在花中它说任务已完成,但在芹菜日志中,它正在输出我正在运行的脚本中定义的日志数据。我找到了以下相关帖子。我相信我正在遵循它的建议,从tasks.py 执行一个python 函数。
任务.py:
def exe(workDir, cancelRun):
sys.path.append(workDir)
import run
if cancelRun=='True':
task_id=exe.request.id
revoke(task_id,terminate=True)
else:
run.runModel(workDir)
task_id=exe.request.id
return task_id
Run Code Online (Sandbox Code Playgroud)
runModel 函数代码:
def runModel(scendir):
fullpath=scendir+'/run.py'
os.chdir(scendir)
p=Process(target=myMain,args=(scendir,))
p.start()
p.join()
Run Code Online (Sandbox Code Playgroud) 在我正在处理的应用程序中,用户可以执行"转换",其中包含"步骤".步骤可以对其他步骤具有任意数量的依赖性.我希望能够调用转换并将这些步骤作为单独的Celery任务并行执行.
理想情况下,我喜欢celery-tasktree的一些东西,除了一般的有向非循环图,而不仅仅是树,但似乎还没有这样的库存在.
首先想到的解决方案是对标准拓扑排序的并行调整 - 而不是确定满足依赖关系的步骤的线性排序,我们确定可以在开始时并行执行的整个步骤集,然后是可以在第2轮中执行的整个步骤集,依此类推.
但是,当任务占用不同的时间并且工作人员必须空闲等待较长时间运行的任务时,这不是最佳的,而现在有任务可以运行.(对于我的具体应用,此解决方案现在可能还不错,但我仍然想弄清楚如何优化它.)
如https://cs.stackexchange.com/questions/2524/getting-parallel-items-in-dependency-resolution中所述,更好的方法是直接在DAG上运行 - 在每个任务完成后,检查是否有任何依赖现在可以运行任务,如果是,则运行它们.
实现这样的事情最好的方法是什么?我不清楚有一种简单的方法可以做到这一点.
据我所知,Celery的组/链/和弦原语不够灵活,不足以让我表达完整的DAG - 虽然我可能在这里错了?
我想我可以为当前任务完成后通知相关任务的任务创建一个包装器 - 我不确定处理这种通知的最佳方法是什么.访问应用程序的Django数据库并不是特别简洁,并且很难将其转换为通用库,但Celery本身并没有为此提供明显的机制.
在我的应用程序层,我有一个包含8个队列的工作程序,每个队列包含来自不同类别的任务.每个芹菜任务将记录保存在按不同类别分类的服务器上的分布式数据库中.
例如: - 类别A的任务将在队列A中排队,然后转发并插入服务器A,依此类推.
如果任何服务器关闭,芹菜任务将导致失败.当它再次启动时,子服务器将ping应用层并准备好接收数据.然后,我想重新运行该特定服务器(类别)的所有失败任务,由于一些错误的原因导致失败.如何从特定队列查看和重新运行失败的任务?
我会看看Celery Flower API,但它没有让我选择从特定队列中获取所有失败任务.它反而给出了特定工人的失败任务.我该怎么做?
此外,请不要建议使用重试任务的选项default_retry_delay=5 * 60, max_retries=12.
我在Celery中有很多任务都使用了canvas chain.
@shared_task(bind=True)
def my_task_A(self):
try:
logger.debug('running task A')
do something
except Exception:
run common cleanup function
@shared_task(bind=True)
def my_task_B(self):
try:
logger.debug('running task B')
do something else
except Exception:
run common cleanup function
...
Run Code Online (Sandbox Code Playgroud)
到现在为止还挺好.问题是我正在寻找使用这样的通用实用程序函数的最佳实践:
def cleanup_and_notify_user(task_data):
logger.debug('task failed')
send email
delete folders
...
Run Code Online (Sandbox Code Playgroud)
没有任务阻止,最好的办法是什么?例如,我可以run common cleanup function用一个电话替换cleanup_and_notify_user(task_data)吗?如果来自多个工作人员的多个任务试图同时调用该函数会发生什么?
每个工人都有自己的副本吗?我显然对这里的一些概念感到困惑.任何帮助深表感谢.
谢谢大家.
我正在尝试将 Django Signals 的 post_save 函数与 Celery 任务结合使用。将新的 Message 对象保存到数据库后,我想评估该实例是否具有两个属性之一,如果有,则调用 'send_sms_function',它是 Celery 注册的任务。
任务.py
from my_project.celery import app
@app.task
def send_sms_message(message):
# Do something
Run Code Online (Sandbox Code Playgroud)
信号.py
from django.db.models.signals import post_save
from django.dispatch import receiver
import rollbar
rollbar.init('234...0932', 'production')
from dispatch.models import Message
from comm.tasks import send_sms_message
@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
if instance.some_attribute == 'A' or instance.some_attribute == 'B':
try:
send_sms_message.delay(instance)
except:
rollbar.report_exc_info()
else:
pass
Run Code Online (Sandbox Code Playgroud)
我正在通过运行 Celery 工作器在本地测试它。当我在 Django shell 中调用 Celery 函数时,它按预期工作。但是,当我将 Message 实例保存到数据库时,该函数无法按预期工作:任务队列中没有发布任何内容,也没有看到任何错误消息。
我究竟做错了什么?
我有以下设置:
我在码头工人的容器中运行芹菜,所以当我重建码头工人时,芹菜工人没有优雅地关闭.这是可以的,如果没有确认任务,因为一旦工人再次在新的docker容器中,经纪人会将它们发回去,但在其他情况下,他们将会丢失.
在花管理面板中,预取任务已收到状态.
我仔细阅读了官方文档和相关问题,直觉上我觉得我的设置中的预取任务得到了认可.是这样吗?
我编写了一个模块,根据项目设置中的字典列表(通过导入django.conf.settings)动态添加定期芹菜任务.我这样做是使用一个函数add_tasks来调度一个函数,该函数使用uuid设置中给出的特定函数进行调用:
def add_tasks(celery):
for new_task in settings.NEW_TASKS:
celery.add_periodic_task(
new_task['interval'],
my_task.s(new_task['uuid']),
name='My Task %s' % new_task['uuid'],
)
Run Code Online (Sandbox Code Playgroud)
像这里建议我使用on_after_configure.connect信号来调用我的函数celery.py:
app = Celery('my_app')
@app.on_after_configure.connect
def setup_periodic_tasks(celery, **kwargs):
from add_tasks_module import add_tasks
add_tasks(celery)
Run Code Online (Sandbox Code Playgroud)
这个设置适用于两者celery beat,celery worker但在我用于uwsgi服务我的django应用程序的地方中断了我的设置.Uwsgi运行顺利,直到视图代码第一次使用celery的.delay()方法发送任务.在这一点上,似乎芹菜被初始化,uwsgi但在上面的代码中永远阻止.如果我从命令行手动运行它然后在它阻塞时中断,我得到以下(缩短的)堆栈跟踪:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
return obj.__dict__[self.__name__]
KeyError: 'tasks'
During handling of the above exception, another exception occurred: …Run Code Online (Sandbox Code Playgroud)