标签: celery-task

在django中的Celery中止一个正在运行的任务

我希望能够中止从Celery队列运行的任务(使用rabbitMQ).我用这个叫任务

task_id = AsyncBoot.apply_async(args=[name], name=name, connect_timeout=3)
Run Code Online (Sandbox Code Playgroud)

其中AsyncBoot是已定义的任务.

我可以获取任务ID(假设这是apply_async返回的长字符串)并将其存储在数据库中,但我不确定如何调用中止方法.我看到如何使Abortable tasks类使方法不可用,但如果我只有task-id字符串,我该如何在任务上调用.abort()?谢谢.

python django rabbitmq celery celery-task

6
推荐指数
1
解决办法
2869
查看次数

python芹菜max-tasks-per-child-setting默认值

我正在使用芹菜,我想使用max-tasks-per-child-setting, 因为一些芹菜进程会占用大量内存.

我想在更改之前找到此设置的默认值,但我无法找到该信息.

我看了这里,但我不想设置它,1因为我不希望它重新启动每个任务.

python celery celery-task

6
推荐指数
1
解决办法
4794
查看次数

如何从Java等非python语言调用芹菜任务延迟函数?

我已经在3台机器上安装了celery + rabbitmq.我还创建了一个任务,它根据文件中的数据生成正则表达式,并使用该信息来解析文本.

from celery import Celery

celery = Celery('tasks', broker='amqp://localhost//')
import re

@celery.task
def add(x, y):
     return x + y


def get_regular_expression():
    with open("text") as fp:
        data = fp.readlines()
    str_re = "|".join([x.split()[2] for x in data ])
    return str_re    



@celery.task
def analyse_json(tw):
    str_re = get_regular_expression()
    re.match(str_re,tw.text) 
Run Code Online (Sandbox Code Playgroud)

我可以使用以下python代码轻松调用此任务: -

from tasks import analyse_tweet_json
x = tweet ## load from a file (x is a json)
analyse_tweet_json.delay(x) 
Run Code Online (Sandbox Code Playgroud)

但是,现在我想从Java而不是python进行相同的调用.我不确定做同样事情的最简单方法是什么.

我已经编写了这段代码,用于向AMQP代理发送消息.代码运行正常,但任务没有执行.我不知道如何指定应该执行的任务的名称.

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

class try1 {
public …
Run Code Online (Sandbox Code Playgroud)

python java rabbitmq celery celery-task

6
推荐指数
1
解决办法
1753
查看次数

将Web请求上下文透明地传递给芹菜任务

我有一个多租户设置,我想将某些客户特定信息,特别是request.host传递给芹菜任务,理想情况下,它应该在全局变量中可用.有没有办法以对应用程序透明的方式设置它?

任务将以相同的方式调用:

my_background_func.delay(foo, bar)
Run Code Online (Sandbox Code Playgroud)

任务的定义方式相同,只是它可以访问名为'request'的全局变量,该变量具有属性'host':

@celery_app.task
def my_background_func(foo, bar):
    print "running the task for host:" + request.host
Run Code Online (Sandbox Code Playgroud)

celery flask celery-task flask-extensions

6
推荐指数
1
解决办法
857
查看次数

即使在任务开始后,AsyncResult(task_id)也会返回"PENDING"状态

在项目中,我尝试轮询一个长时间运行的任务的task.state并更新其运行状态.它在开发中起作用,但是当我在生产服务器上移动项目时它不起作用.即使我可以看到任务开始在花上,我仍然不停地'待命'.但是,当任务完成时,我仍然可以更新结果,当task.state =='SUCCESS'时.我在生产中使用python 2.6,Django 1.6和Celery 3.1,结果后端AMQP.

@csrf_exempt
def poll_state(request):
    data = 'Fail'

    if request.is_ajax():
            if 'task_id' in request.POST.keys() and request.POST['task_id']:
                    task_id = request.POST['task_id']
                    email = request.POST['email']
                    task = AsyncResult(task_id)
                    print "task.state=", task.state
                    if task.state == 'STARTED':
                            task_state = 'Running'
                            data = 'Running'
                            #data = 'Running'
                    elif task.state == 'PENDING' or task.state == 'RETRY':
                            task_state = 'Waiting'
                            data = 'Pending'
                    elif task.state == 'SUCCESS':
                            task_state = 'Finished'
                            if task.result:
                                    data = task.result
                            else:
                                    data = 'None'

                    else:
                            task_state = task.state
                            data = …
Run Code Online (Sandbox Code Playgroud)

python django celery celery-task django-celery

6
推荐指数
2
解决办法
2092
查看次数

芹菜任务 - 如何获得自定义状态?

我有许多Celery任务,它们是长期运行的进程.因此,我想实现自定义状态以查询其进度.

根据文档,为给定​​任务实现自定义状态很容易.

def download_count(wget_base_path):
    # recursively traverse root folder and return count of files
    return sum([len(files) for r, d, files in os.walk(wget_base_path)])

@app.task(bind = True)
def html_download(self, url='', cl_id=-1):

    log = get_logger(__name__)
    ...
    # wget download location
    wget_base_path = settings.WGET_PATH + str(cl_id) 

    os.system(wget_cmd)

    if not self.request.called_directly:
        log.debug('State progress called')
        self.update_state(state = 'PROGRESS', meta = {'item_count' : download_count(wget_base_path)})
Run Code Online (Sandbox Code Playgroud)

现在,当我打电话给这个时

from app.ingest.tasks import html

ingest = html.html_download.delay(url, 54431)
Run Code Online (Sandbox Code Playgroud)

这项工作按预期开始.但是每当我尝试获得更新状态时,我都不会获得任何元数据.

例如,

In [6]: ingest.state
Out[6]: 'PENDING'


In [10]: ingest._get_task_meta()
Out[10]: {'result': …
Run Code Online (Sandbox Code Playgroud)

python task celery celery-task

6
推荐指数
1
解决办法
483
查看次数

芹菜加工'工人'退出'exitcode 1'

celery工作者(Flask应用程序的一部分)在开始之前被杀死:

Celery配置参数(Windows,Celery 3.1.25,Rabbitmq(最新))

flask_app = Flask(__name__)
flask_app.secret_key = 'some_secret'
flask_app.config['CELERY_BROKER_URL'] = 'amqp://localhost/'
flask_app.config['CELERY_RESULT_BACKEND'] = 'amqp://localhost/'

flask_app.config['CELERY_ACCEPT_CONTENT'] = ['json']
flask_app.config['CELERY_TASK_SERIALIZER'] = 'json'
flask_app.config['CELERY_RESULT_SERIALIZER'] = 'json'
flask_app.config['CELERY_IGNORE_RESULT'] = True

flask_app.config['CELERY_ROUTES'] = {'task': {'queue': 'agent_queue'}}
flask_app.config['CELERY_IMPORTS'] = ['Monitor.app']
Run Code Online (Sandbox Code Playgroud)

结果:

 -------------- celery-01 v3.1.25 (Cipater)
---- **** -----
--- * ***  * -- Windows-7-SP1
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         MonitorSetup.app:0x4aad030
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * …
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

6
推荐指数
1
解决办法
1208
查看次数

如何向特定的worker注册Celery任务?

我正在用 Python/Django 开发 Web 应用程序,并且有几个任务在 celery 中运行。

我必须一次运行一个任务 A,因此我使用 --concurrency=1 创建了工作线程,并使用以下命令将任务 A 路由到该工作线程。

celery -A proj worker -Q A -c 1 -l INFO
Run Code Online (Sandbox Code Playgroud)

一切工作正常,因为该工作进程处理任务 A 和其他任务被路由到默认队列。

但是,当我使用inspect命令为工作人员获取注册任务时,上述工作人员返回所有任务。这是绝对正确的,因为当我启动工作程序时,它将项目的所有任务显示为注册任务,但仅处理任务 A。

以下是我启动时工作人员的输出。

$ celery -A proj worker -Q A -c 1 -l INFO

 -------------- celery@pet_sms v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.8.10-040810-generic-x86_64-with-Ubuntu-16.04-xenial 2018-04-26 14:11:49
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         proj:0x7f298a10d208
- ** ---------- .> transport:   redis://localhost:6379/0
- ** …
Run Code Online (Sandbox Code Playgroud)

celery celery-task django-celery celerybeat

6
推荐指数
1
解决办法
2738
查看次数

Celery-工人关闭后如何更新任务状态?

设定

芹菜3.0

经纪人= RabbitMQ

情境

任务已经被确认并开始处理,并且具有state=STARTED。然后,我想重新启动工作程序(以将工作程序更新为较新的版本)。重新启动worker(使用supervisorctl restart)后,那些长时间运行的任务将全部终止。但是他们的状态仍然在state=STARTED。如何将其状态更新为FAILURE其他值?(而且,我不希望在工作程序重新启动后再次执行这些任务。)

尝试过的方法(但不起作用)

  • 使用track_started=True---如果使用此选项,则state=STARTED在工作线程重新启动后,任务将保留在此处。如果没有此选项,则state=PENDING在工作程序重新启动后,任务将保留。
  • 使用CELERY_ACKS_LATE=True--- state=STARTED工作重新启动后,任务将保留在。并且再次执行任务,而不是期望的行为。
  • 使用signal(SIGTERM, handler)和处理程序函数来捕获信号。可以成功输入处理程序。但是,无论我将什么内容放入处理程序中,都无法更改任务的状态。状态保持不变,不会更改为FAILURE。在我尝试过的处理程序中
    • raise Exception
    • exit(0)
    • exit(1)

Celery是否有任何设置可以使其跟踪正在关闭的任务状态?

python python-2.x celery python-2.7 celery-task

6
推荐指数
1
解决办法
194
查看次数

Celery - 检查工作人员是否收到 SIGTERM

我有一个Celery相当长的任务。超过几分钟。

有时,由于各种原因,一个工作人员被标记为终止,而另一个工作人员则开始工作。如果需要更换运行它的计算机,或者正在部署新的代码版本,则可能会发生这种情况。在这种情况下,工作线程会收到 SIGTERM 信号。

我想知道任务本身是否可以定期检查该工作线程是否已收到 SIGTERM 并且正在等待终止,在这种情况下,只需将任务放回队列中并终止即可。(然后该任务将在另一个工作人员上启动,并继续执行其工作)

编辑:澄清 - 是否可以在任务中检查它是否在等待终止的工作线程上执行。像这样:

# Some long task that can take even a few hours.
def some_task(...):
    for i in range(...):
        do_some_work()
        # That's the missing function:
        if did_this_worker_received_SIGTERM_and_waiting_to_be_terminated():
             # stop the task in the middle, and it will be executed again later
Run Code Online (Sandbox Code Playgroud)

python celery celery-task

6
推荐指数
1
解决办法
6757
查看次数