我想在链中使用一个组(或块),例如:
chain(getRange.s(3), GROUP() , xsum.s() )
Run Code Online (Sandbox Code Playgroud)
GROUP()一组double()任务在哪里,即group(double(0),double(1),double(2)).在如何链接一个将列表返回到组中的Celery任务中发布了类似的问题?但是没有解释如何将组中的输出传递给链中的下一个任务.
@task
def getRange(x):
return range(x)
@task
def double(nr):
return nr*2
@task
def xsum(list):
return sum(list)
Run Code Online (Sandbox Code Playgroud) 首先,请不要考虑这个问题作为一个重复 这个问题
我有一个设置环境,使用celery和redisas broker和result_backend.我的问题是如何确保当芹菜工人崩溃时,所有计划的任务都会重新尝试,当芹菜工人回来时.
我已经看到了使用的建议CELERY_ACKS_LATE = True,以便代理将重新驱动任务,直到它得到一个ACK,但在我的情况下它不起作用.每当我安排一项任务时,它会立即发送到工作人员,该工作人员将其持续到预定的执行时间.让我举个例子:
我正在安排这样的任务:res=test_task.apply_async(countdown=600),但是在芹菜工人日志中,我可以看到类似的东西:Got task from broker: test_task[a137c44e-b08e-4569-8677-f84070873fc0] eta:[2013-01-...].现在,当我杀死芹菜工人时,这些预定的任务就会丢失.我的设置:
BROKER_URL = "redis://localhost:6379/0"
CELERY_ALWAYS_EAGER = False
CELERY_RESULT_BACKEND = "redis://localhost:6379/0"
CELERY_ACKS_LATE = True
Run Code Online (Sandbox Code Playgroud) 我用过芹菜MySQL.我想将任务id存储为数据库或celery变量中的普通整数task.我怎样才能做到这一点?
我有几个芹菜任务,包含在我的Django测试中.不幸的是,当通过.delay()调用任务时,不会抛出异常.我将CELERY_ALWAYS_EAGER设置为True.
tasks.py
import celeryapp as app
@app.task()
def exception_task():
print 'CELERY_ALWAYS_EAGER:', app.conf['CELERY_ALWAYS_EAGER']
raise Exception('foo')
Run Code Online (Sandbox Code Playgroud)
tests.py
def test_exception_in_task(self):
from tasks import exception_task
exception_task.delay()
Run Code Online (Sandbox Code Playgroud)
产量
CELERY_ALWAYS_EAGER: True
.
----------------------------------------------------------------------
Ran 1 test in 0.686s
Run Code Online (Sandbox Code Playgroud)
删除.delay时,测试将退出并出现错误:
ERROR: test_exception_in_task
Exception: foo
Run Code Online (Sandbox Code Playgroud)
版本
celery==3.1.4
Django==1.6.4
Run Code Online (Sandbox Code Playgroud) 我最近开始致力于分布式计算,以提高计算速度.我选择了芹菜.但是,我对某些术语不太熟悉.所以,我有几个相关的问题.
来自Celery文档:
什么是任务队列?
...
Celery通过消息进行通信,通常使用经纪人在客户和工人之间进行调解.为了启动任务,客户端向队列添加消息,然后代理将该消息传递给工作者.
什么是客户(这里)?什么是经纪人?为什么通过经纪人传递消息?为什么Celery会使用后端和队列进行进程间通信?
当我通过发出命令执行Celery控制台时
celery worker -A tasks --loglevel=info --concurrency 5
Run Code Online (Sandbox Code Playgroud)
这是否意味着Celery控制台是一个工作进程,它负责5个不同的进程并跟踪任务队列?当新任务被推入任务队列时,此工作人员是否将任务/作业分配给5个进程中的任何进程?
我需要更换我redis与经纪人SQS的经纪人,而googleing它,我碰到很多页面,告诉如何使用SQS带celery.根据我的理解,它创建自己的SQS队列,我只有一个任务,并希望使用已经创建的SQS队列.
当任何芹菜任务排队时,我想添加工作人员将能够使用的上下文元数据。
以下代码示例有效,但我想要一个合适的芹菜式解决方案。
from celery.signals import before_task_publish, task_prerun
@before_task_publish.connect
def receiver_before_task_publish(sender=None, headers=None, body=None, **kwargs):
task_kwags = body[1]
metadata = {"foo": "bar"}
task_kwags['__metadata__'] = metadata
@task_prerun.connect
def receiver_task_pre_run(task_id, task, *args, **kwargs):
metadata = kwargs['kwargs'].pop('__metadata__', {})
# metadata == {"foo": "bar"}
Run Code Online (Sandbox Code Playgroud) Celery - 底线:我想通过使用任务 id 来获取任务名称(我没有任务对象)
假设我有这个代码:
res = chain(add.s(4,5), add.s(10)).delay()
cache.save_task_id(res.task_id)
Run Code Online (Sandbox Code Playgroud)
然后在其他地方:
task_id = cache.get_task_ids()[0]
task_name = get_task_name_by_id(task_id) #how?
print(f'Some information about the task status of: {task_name}')
Run Code Online (Sandbox Code Playgroud)
我知道如果我有一个任务对象,我可以得到任务名称,比如这里:celery: get function name by task id? . 但是我没有任务对象(也许它可以由 task_id 或其他方式创建?我在文档中没有看到任何与此相关的内容)。
另外,我不想在缓存中保存任务名称。(假设我有一个很长的链/其他 celery 原语,我不想保存它们的所有名称/task_ids。仅最后一个 task_id 应该足以获取有关所有任务的所有信息,使用 .parents 等)
我查看了 AsyncResult 和 AsyncResult.Backend 对象的所有相关方法。唯一似乎相关的是 backend.get_task_meta(task_id),但它不包含任务名称。提前致谢
PS:AsyncResult.name 总是返回 None:
result = AsyncResult(task_id, app=celery_app)
result.name #Returns None
result.args #Also returns None
Run Code Online (Sandbox Code Playgroud) 我无法理解 Celery 队列和撤销。我的主要问题是,如果创建了具有相同名称和参数的新任务,如何撤销(停止执行)队列中已有的任务。所以在伪代码中
new_task_to_be_added = some_task(1,2)
if exists some_task in queue where some_task.args = (1,2):
remove such task from queue
new_task_to_be_added.add_to_queue()
Run Code Online (Sandbox Code Playgroud)
这与密码重置机制相关 - 用户单击“创建临时密码”,并有 5 分钟的时间使用此密码创建新的永久密码。如果此时未设置新密码,则帐户应被锁定。但是,如果在 4 分钟后用户创建了一个新的临时密码,他应该再次有 5 分钟的时间来更改它,但“锁定帐户”任务将在 1 分钟内触发 - 我想停止它并始终只使用最新的任务(使用相同参数)
这不是以下内容的副本: Revoke celerytaskswithsameargs/kwargs 这实际上是相反的问题
我有一个以下项目目录:
azima:
__init.py
main.py
tasks.py
Run Code Online (Sandbox Code Playgroud)
任务.py
from .main import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
Run Code Online (Sandbox Code Playgroud)
主要.py
from celery import Celery
app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
Run Code Online (Sandbox Code Playgroud)
当我运行命令来初始化 celery 工作人员时,出现以下错误:
(azima_venv) brightseid@darkseid:~$ celery -A azima worker -l INFO
Usage: celery [OPTIONS] COMMAND [ARGS]...
Error: Invalid value for '-A' / …Run Code Online (Sandbox Code Playgroud) celery ×10
celery-task ×10
python ×5
django ×3
amazon-sqs ×1
asynchronous ×1
broker ×1
redis ×1
testing ×1
worker ×1