初始化不同价值的芹菜工人

Pra*_*ran 8 celery flask python-2.7

我正在使用芹菜在Hadoop上运行长时间运行的任务.每个任务在Hadoop上执行一个Pig脚本,运行大约30分钟 - 2小时.

我当前的Hadoop设置有4个队列a,b,c和默认值.所有任务当前都由单个工作程序执行,该工作程序将作业提交到单个队列.

我想再添加3个将作业提交到其他队列的工作者,每个队列一个工作者.

问题是队列目前是硬编码的,我希望每个工人都有这个变量.

我搜索了很多,但我无法找到一种方法来传递每个芹菜工作者不同的队列值并在我的任务中访问它.

我这样开始我的芹菜工人.

celery -A app.celery worker
Run Code Online (Sandbox Code Playgroud)

我希望在命令行本身传递一些额外的参数并在我的任务中访问它,但芹菜抱怨它不理解我的自定义参数.

我计划通过设置--concurrency=3参数来运行同一主机上的所有worker .有没有解决这个问题的方法?

谢谢!

编辑

目前的情况是这样的.我每次尝试执行任务print_something都说tasks.print_something.delay()它只打印队列C.

@celery.task()
def print_something():
    print "C"
Run Code Online (Sandbox Code Playgroud)

我需要让工作人员根据我在启动时传递给他们的值来打印一个可变字母.

@celery.task()
def print_something():
    print "<Variable Value Per Worker Here>"
Run Code Online (Sandbox Code Playgroud)

Pra*_*ran 4

希望这对某人有帮助。

这个问题需要解决多个问题。

第一步涉及在 celery 中添加对自定义参数的支持。如果不这样做,celery 会抱怨它不理解该参数。

因为我用 Flask 运行 celery,所以我像这样初始化 celery。

def configure_celery():
    app.config.update(
        CELERY_BROKER_URL='amqp://:@localhost:5672',
        RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'            
    )
    celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery
Run Code Online (Sandbox Code Playgroud)

我调用这个函数来初始化 celery 并将其存储在一个名为 celery 的变量中。

celery = configure_celery()
Run Code Online (Sandbox Code Playgroud)

要添加自定义参数,您需要执行以下操作。

def add_hadoop_queue_argument_to_worker(parser):
    parser.add_argument(
        '--hadoop-queue', help='Hadoop queue to be used by the worker'
    )
Run Code Online (Sandbox Code Playgroud)

下面使用的芹菜是我们通过上述步骤获得的芹菜。

celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)
Run Code Online (Sandbox Code Playgroud)

下一步是让工作人员可以访问这个参数。为此,请按照以下步骤操作。

class HadoopCustomWorkerStep(bootsteps.StartStopStep):

    def __init__(self, worker, **kwargs):
        worker.app.hadoop_queue = kwargs['hadoop_queue']
Run Code Online (Sandbox Code Playgroud)

通知 celery 使用此类来创建工人。

celery.steps['worker'].add(HadoopCustomWorkerStep)
Run Code Online (Sandbox Code Playgroud)

任务现在应该能够访问变量。

@app.task(bind=True)
def print_hadoop_queue_from_config(self):
    print self.app.hadoop_queue
Run Code Online (Sandbox Code Playgroud)

通过在命令行上运行工作程序来验证它。

celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h
Run Code Online (Sandbox Code Playgroud)