我希望能够中止从Celery队列运行的任务(使用rabbitMQ).我用这个叫任务
task_id = AsyncBoot.apply_async(args=[name], name=name, connect_timeout=3)
Run Code Online (Sandbox Code Playgroud)
其中AsyncBoot是已定义的任务.
我可以获取任务ID(假设这是apply_async返回的长字符串)并将其存储在数据库中,但我不确定如何调用中止方法.我看到如何使Abortable tasks类使方法不可用,但如果我只有task-id字符串,我该如何在任务上调用.abort()?谢谢.
我正在使用芹菜,我想使用max-tasks-per-child-setting, 因为一些芹菜进程会占用大量内存.
我想在更改之前找到此设置的默认值,但我无法找到该信息.
我看了这里,但我不想设置它,1因为我不希望它重新启动每个任务.
我已经在3台机器上安装了celery + rabbitmq.我还创建了一个任务,它根据文件中的数据生成正则表达式,并使用该信息来解析文本.
from celery import Celery
celery = Celery('tasks', broker='amqp://localhost//')
import re
@celery.task
def add(x, y):
return x + y
def get_regular_expression():
with open("text") as fp:
data = fp.readlines()
str_re = "|".join([x.split()[2] for x in data ])
return str_re
@celery.task
def analyse_json(tw):
str_re = get_regular_expression()
re.match(str_re,tw.text)
Run Code Online (Sandbox Code Playgroud)
我可以使用以下python代码轻松调用此任务: -
from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x)
Run Code Online (Sandbox Code Playgroud)
但是,现在我想从Java而不是python进行相同的调用.我不确定做同样事情的最简单方法是什么.
我已经编写了这段代码,用于向AMQP代理发送消息.代码运行正常,但任务没有执行.我不知道如何指定应该执行的任务的名称.
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
class try1 {
public …Run Code Online (Sandbox Code Playgroud) 我有一个多租户设置,我想将某些客户特定信息,特别是request.host传递给芹菜任务,理想情况下,它应该在全局变量中可用.有没有办法以对应用程序透明的方式设置它?
任务将以相同的方式调用:
my_background_func.delay(foo, bar)
Run Code Online (Sandbox Code Playgroud)
任务的定义方式相同,只是它可以访问名为'request'的全局变量,该变量具有属性'host':
@celery_app.task
def my_background_func(foo, bar):
print "running the task for host:" + request.host
Run Code Online (Sandbox Code Playgroud) 在项目中,我尝试轮询一个长时间运行的任务的task.state并更新其运行状态.它在开发中起作用,但是当我在生产服务器上移动项目时它不起作用.即使我可以看到任务开始在花上,我仍然不停地'待命'.但是,当任务完成时,我仍然可以更新结果,当task.state =='SUCCESS'时.我在生产中使用python 2.6,Django 1.6和Celery 3.1,结果后端AMQP.
@csrf_exempt
def poll_state(request):
data = 'Fail'
if request.is_ajax():
if 'task_id' in request.POST.keys() and request.POST['task_id']:
task_id = request.POST['task_id']
email = request.POST['email']
task = AsyncResult(task_id)
print "task.state=", task.state
if task.state == 'STARTED':
task_state = 'Running'
data = 'Running'
#data = 'Running'
elif task.state == 'PENDING' or task.state == 'RETRY':
task_state = 'Waiting'
data = 'Pending'
elif task.state == 'SUCCESS':
task_state = 'Finished'
if task.result:
data = task.result
else:
data = 'None'
else:
task_state = task.state
data = …Run Code Online (Sandbox Code Playgroud) 我有许多Celery任务,它们是长期运行的进程.因此,我想实现自定义状态以查询其进度.
根据文档,为给定任务实现自定义状态很容易.
def download_count(wget_base_path):
# recursively traverse root folder and return count of files
return sum([len(files) for r, d, files in os.walk(wget_base_path)])
@app.task(bind = True)
def html_download(self, url='', cl_id=-1):
log = get_logger(__name__)
...
# wget download location
wget_base_path = settings.WGET_PATH + str(cl_id)
os.system(wget_cmd)
if not self.request.called_directly:
log.debug('State progress called')
self.update_state(state = 'PROGRESS', meta = {'item_count' : download_count(wget_base_path)})
Run Code Online (Sandbox Code Playgroud)
现在,当我打电话给这个时
from app.ingest.tasks import html
ingest = html.html_download.delay(url, 54431)
Run Code Online (Sandbox Code Playgroud)
这项工作按预期开始.但是每当我尝试获得更新状态时,我都不会获得任何元数据.
例如,
In [6]: ingest.state
Out[6]: 'PENDING'
In [10]: ingest._get_task_meta()
Out[10]: {'result': …Run Code Online (Sandbox Code Playgroud) celery工作者(Flask应用程序的一部分)在开始之前被杀死:
Celery配置参数(Windows,Celery 3.1.25,Rabbitmq(最新))
flask_app = Flask(__name__)
flask_app.secret_key = 'some_secret'
flask_app.config['CELERY_BROKER_URL'] = 'amqp://localhost/'
flask_app.config['CELERY_RESULT_BACKEND'] = 'amqp://localhost/'
flask_app.config['CELERY_ACCEPT_CONTENT'] = ['json']
flask_app.config['CELERY_TASK_SERIALIZER'] = 'json'
flask_app.config['CELERY_RESULT_SERIALIZER'] = 'json'
flask_app.config['CELERY_IGNORE_RESULT'] = True
flask_app.config['CELERY_ROUTES'] = {'task': {'queue': 'agent_queue'}}
flask_app.config['CELERY_IMPORTS'] = ['Monitor.app']
Run Code Online (Sandbox Code Playgroud)
结果:
-------------- celery-01 v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Windows-7-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: MonitorSetup.app:0x4aad030
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp://
- *** --- * …Run Code Online (Sandbox Code Playgroud) 我正在用 Python/Django 开发 Web 应用程序,并且有几个任务在 celery 中运行。
我必须一次运行一个任务 A,因此我使用 --concurrency=1 创建了工作线程,并使用以下命令将任务 A 路由到该工作线程。
celery -A proj worker -Q A -c 1 -l INFO
Run Code Online (Sandbox Code Playgroud)
一切工作正常,因为该工作进程处理任务 A 和其他任务被路由到默认队列。
但是,当我使用inspect命令为工作人员获取注册任务时,上述工作人员返回所有任务。这是绝对正确的,因为当我启动工作程序时,它将项目的所有任务显示为注册任务,但仅处理任务 A。
以下是我启动时工作人员的输出。
$ celery -A proj worker -Q A -c 1 -l INFO
-------------- celery@pet_sms v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.8.10-040810-generic-x86_64-with-Ubuntu-16.04-xenial 2018-04-26 14:11:49
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x7f298a10d208
- ** ---------- .> transport: redis://localhost:6379/0
- ** …Run Code Online (Sandbox Code Playgroud) 芹菜3.0
经纪人= RabbitMQ
任务已经被确认并开始处理,并且具有state=STARTED。然后,我想重新启动工作程序(以将工作程序更新为较新的版本)。重新启动worker(使用supervisorctl restart)后,那些长时间运行的任务将全部终止。但是他们的状态仍然在state=STARTED。如何将其状态更新为FAILURE其他值?(而且,我不希望在工作程序重新启动后再次执行这些任务。)
track_started=True---如果使用此选项,则state=STARTED在工作线程重新启动后,任务将保留在此处。如果没有此选项,则state=PENDING在工作程序重新启动后,任务将保留。CELERY_ACKS_LATE=True--- state=STARTED工作重新启动后,任务将保留在。并且再次执行任务,而不是期望的行为。signal(SIGTERM, handler)和处理程序函数来捕获信号。可以成功输入处理程序。但是,无论我将什么内容放入处理程序中,都无法更改任务的状态。状态保持不变,不会更改为FAILURE。在我尝试过的处理程序中
raise Exceptionexit(0)exit(1)Celery是否有任何设置可以使其跟踪正在关闭的任务状态?
我有一个Celery相当长的任务。超过几分钟。
有时,由于各种原因,一个工作人员被标记为终止,而另一个工作人员则开始工作。如果需要更换运行它的计算机,或者正在部署新的代码版本,则可能会发生这种情况。在这种情况下,工作线程会收到 SIGTERM 信号。
我想知道任务本身是否可以定期检查该工作线程是否已收到 SIGTERM 并且正在等待终止,在这种情况下,只需将任务放回队列中并终止即可。(然后该任务将在另一个工作人员上启动,并继续执行其工作)
编辑:澄清 - 是否可以在任务中检查它是否在等待终止的工作线程上执行。像这样:
# Some long task that can take even a few hours.
def some_task(...):
for i in range(...):
do_some_work()
# That's the missing function:
if did_this_worker_received_SIGTERM_and_waiting_to_be_terminated():
# stop the task in the middle, and it will be executed again later
Run Code Online (Sandbox Code Playgroud) celery ×10
celery-task ×10
python ×8
django ×2
rabbitmq ×2
celerybeat ×1
flask ×1
java ×1
python-2.7 ×1
python-2.x ×1
task ×1