Kev*_*uli 5 airflow airflow-scheduler
我正在评估Airflow 1.9.0以满足我们的分布式业务流程需求(使用CeleryExecutor和RabbitMQ),并且看到了一些奇怪的事情。
我做了一个分为三个阶段的任务:
N可能很大,可能高达10K。我希望看到第二阶段开始时,N个任务被转储到Rabbit队列中。相反,我一次只能看到几百个。随着工作人员处理任务和队列变小,那么更多的工人将被添加到Celery / Rabbit中。最终,它确实完成了,但是我真的希望它立即将所有工作(所有10K任务)转储到Celery中,这有两个原因:
当前的方法使调度程序长寿且有状态。调度程序可能仅在完成5K任务后就死掉,在这种情况下,其余5K任务将永远不会添加(我验证了这一点)
我想使用Rabbit队列的大小作为指标来触发自动缩放事件以添加更多工作线程。因此,我需要真实地了解尚有多少出色的工作(10K,而不是几百)
我认为调度程序具有某种限制,使其无法同时转储所有10K消息吗?如果可以的话,这是可配置的吗?
仅供参考,我已经在airflow.cfg中将“并行度”设置为10K
这是我的测试达人:
# This dag tests how well airflow fans out
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('fan_out', default_args=default_args, schedule_interval=None)
num_tasks = 10000
starting = BashOperator(
task_id='starting',
bash_command='echo starting',
dag=dag
)
all_done = BashOperator(
task_id='all_done',
bash_command='echo all done',
dag=dag)
for i in range(0, num_tasks):
task = BashOperator(
task_id='say_hello_' + str(i),
bash_command='echo hello world',
dag=dag)
task.set_upstream(starting)
task.set_downstream(all_done)
Run Code Online (Sandbox Code Playgroud)
您还需要增加其他几个设置。
[core]增加之下non_pooled_task_slot_count。这将允许更多任务实际在芹菜中排队。
下[celery]增加celeryd_concurrency。这将增加每个工作人员将尝试同时从队列运行的任务数量。
话虽这么说,是为了回应您的第一个理由...
虽然为true,但是如果调度程序未运行,则其余任务不会排队,但这是因为Airflow调度程序的设计寿命很长。当您的工人在运行时,它应该一直在运行。如果调度程序因任何原因被杀死或死亡,一旦启动备份,它将在中断的地方继续工作。
感谢那些建议其他并发设置的人。通过反复试验,我了解到我需要设置所有这三个:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
Run Code Online (Sandbox Code Playgroud)
仅启用这两个任务时,我可以达到 10K,但速度非常慢,仅以阶梯方式每 30 秒突发添加 100 个新任务:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT=10000
Run Code Online (Sandbox Code Playgroud)
如果我只启用这两个,它是相同的“阶梯”模式,每 30 秒添加 128:
- AIRFLOW__CORE__PARALLELISM=10000
- AIRFLOW__CORE__DAG_CONCURRENCY=10000
Run Code Online (Sandbox Code Playgroud)
但如果我设置了所有三个,它确实会一次性向队列中添加 10K。
| 归档时间: |
|
| 查看次数: |
789 次 |
| 最近记录: |