Airflow 1.10.3 SubDag 并发数为 8 时也只能并行运行 1 个任务

Kev*_* Li 12 directed-acyclic-graphs airflow airflow-scheduler

最近,我将 Airflow 从 1.9 升级到 1.10.3(最新版本)。

但是,我确实注意到与 SubDag 并发相关的性能问题。SubDag 中只能拾取 1 个任务,这不是应该的方式,我们对 SubDag 的并发设置为 8。

见下: get_monthly_summary-214get_monthly_summary-215是两个SubDag,可以由父dag并发在controller中并行运行

在此处输入图片说明

但是当放大 SubDag 时说get_monthly_summary-214,然后 在此处输入图片说明 你肯定可以看到一次只有 1 个任务在运行,其他的都在排队,一直这样运行。当我们检查SubDag并发时,实际上是我们在代码中指定的8: 在此处输入图片说明

我们确实设置了池槽大小,它是 32,我们确实有 8 个 celery worker 来接收排队的任务,我们与并发关联的气流配置如下:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16
Run Code Online (Sandbox Code Playgroud)

此外,所有 SubDag 都使用名为 的队列进行配置mini,而其所有内部任务都是名为 的默认队列default,因为如果我们在同一队列上同时运行 SubDag 运算符和 SubDag 内部任务,我们可能会出现一些死锁问题。我还尝试将default队列用于所有任务和操作员,但无济于事。

旧版本 1.9 似乎很好,每个 SubDag 可以并行执行多个任务,我们错过了什么吗?

Kev*_* Li 10

根据上面发布的 @kaxil 的发现,如果您仍然希望并行执行 subdag 内的任务,则解决方案是创建一个包装函数来显式传递executorwhen 构造SubDagOperator

from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor

def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
    return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

sub_dag_operator_with_default_executor创建 subdag 运算符时调用。为了减轻 sub dag 运营商的性能担忧

我们应该将 subdag_operator 的默认执行器更改为 SequentialExecutor。气流池不受 subdagooperator 的支持,因此它可能会消耗所有工作人员资源(例如在 celeryExecutor 中)。这会导致 airflow-74 中提到的问题并限制 subdag_operator 的使用。我们通过指定使用顺序执行器在生产中使用 subdag_operator。

我们建议创建一个特殊的队列(在我们的例子中我们指定 queue='mini')和 celery worker 来处理 subdag_operator,这样它就不会消耗你所有的普通 celery worker 的资源。如下:

 dag = DAG(
    dag_id=DAG_NAME,
    description=f"{DAG_NAME}-{__version__}",
    ...
)    
with dag:
        ur_operator = sub_dag_operator_with_default_executor(
                task_id=f"your_task_id",
                subdag=load_sub_dag(
                    parent_dag_name=DAG_NAME,
                    child_dag_name=f"your_child_dag_name",
                    args=args,
                    concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
                ),
                queue="mini",
                dag=dag
            )
Run Code Online (Sandbox Code Playgroud)

然后当你创建你的特殊 celery worker(我们使用像 2 核和 3G 内存的轻量级主机)时,指定AIRFLOW__CELERY__DEFAULT_QUEUEas mini,取决于你想要并行运行多少子 dag 操作符,你应该创建多个特殊的 celery worker负载均衡资源,我们建议每个特殊的celery worker一次最多处理2个sub dag operator,否则会被耗尽(例如,在2核3G内存的主机上内存不足)

您也可以concurrency通过concurrency_in_sub_dag在气流 UIVariables配置页面中创建的 ENV VAR 在您的 subdag 内部进行调整。

更新 [22/05/2020] 以上仅适用于气流 (<=1.10.3, >= 1.10.0) 对于 1.10.3 以上的气流,请使用

from airflow.executors import get_default_executor
Run Code Online (Sandbox Code Playgroud)

反而。

  • 很有帮助。注意,从 1.10.4 开始,`GetDefaultExecutor` 已更名为 `get_default_executor` (4认同)

kax*_*xil 7

这是因为在 Airflow 1.9.0 中,SubdagOperator 使用了默认的 Executor。

气流 1.9.0https : //github.com/apache/airflow/blob/1.9.0/airflow/operators/subdag_operator.py#L33

class SubDagOperator(BaseOperator):

    template_fields = tuple()
    ui_color = '#555'
    ui_fgcolor = '#fff'

    @provide_session
    @apply_defaults
    def __init__(
            self,
            subdag,
            executor=GetDefaultExecutor(),
            *args, **kwargs):
Run Code Online (Sandbox Code Playgroud)

但是,从 Airflow 1.10 开始,SubDagOperator 的默认执行器更改为 SequentialExecutor

气流 >=1.10 : https://github.com/apache/airflow/blob/1.10.0/airflow/operators/subdag_operator.py#L38

class SubDagOperator(BaseOperator):

    template_fields = tuple()
    ui_color = '#555'
    ui_fgcolor = '#fff'

    @provide_session
    @apply_defaults
    def __init__(
            self,
            subdag,
            executor=SequentialExecutor(),
            *args, **kwargs):
Run Code Online (Sandbox Code Playgroud)

改变它的提交是https://github.com/apache/airflow/commit/64d950166773749c0e4aa0d7032b080cadd56a53#diff-45749879e4753a355c5bdb5203584698

更改的详细原因可以在https://github.com/apache/airflow/pull/3251 中找到

我们应该将 subdag_operator 的默认执行器更改为 SequentialExecutor。气流池不受 subdagooperator 的支持,因此它可能会消耗所有工作人员资源(例如在 celeryExecutor 中)。这会导致 airflow-74 中提到的问题并限制 subdag_operator 的使用。我们通过指定使用顺序执行器在生产中使用 subdag_operator。