在Airflow中创建动态工作流的正确方法

cos*_*ouc 66 python workflow airflow

问题

在Airflow中是否有任何方法可以创建工作流程,以便任务数量B.*在任务A完成之前是未知的?我查看了子标记,但看起来它只能用于必须在Dag创建时确定的一组静态任务.

dag会触发工作吗?如果是这样,请你举个例子.

我有一个问题是,在任务A完成之前,无法知道计算任务C所需的任务B的数量.每个任务B.*将需要几个小时来计算,不能合并.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)

想法#1

我不喜欢这个解决方案,因为我必须创建一个阻塞的ExternalTask​​Sensor,所有的任务B.*需要2到24小时才能完成.所以我认为这不是一个可行的解决方案.当然有一种更简单的方法吗?或者Airflow不是为此而设计的?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|
Run Code Online (Sandbox Code Playgroud)

编辑1:

截至目前,这个问题仍然没有很好的答案.有几个人正在寻找解决方案.

Ole*_*min 21

以下是我在没有任何子标签的情况下使用类似请求的方法:

首先创建一个返回所需值的方法

def values_function():
     return values
Run Code Online (Sandbox Code Playgroud)

接下来创建将动态生成作业的方法:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)
Run Code Online (Sandbox Code Playgroud)

然后结合它们:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete
Run Code Online (Sandbox Code Playgroud)

  • 而不是'for i in values_function()`我会期望在push_func_output`中使用`for i.问题是我找不到动态获取输出的方法.PythonOperator的输出将在执行后出现在Xcom中,但我不知道是否可以从DAG定义中引用它. (6认同)
  • 在我的“values_function”中,我有:“id_list = kwargs['dag_run'].conf.get('param_id_list')”,然后返回 id_list。它将在 `Broken DAG: [my_dag.py] 'dag_run'` 中收到错误。但如果我像“id_list = [1,2,3]”那样硬编码它,那就没问题了。我可以从参数值设置“id_list”吗? (2认同)

Gab*_*abe 18

仅适用于 v2.3 及以上版本:

此功能是使用动态任务映射实现的,仅适用于 Airflow 2.3 及更高版本

更多文档和示例在这里:

例子:

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(list(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())
Run Code Online (Sandbox Code Playgroud)

示例2:

from airflow import XComArg

task = MyOperator(task_id="source")

downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))
Run Code Online (Sandbox Code Playgroud)

图形视图和树视图也已更新:

  • 图形视图
  • 树视图

相关问题在这里:


Chr*_*eck 10

我已经找到了一种根据先前任务的结果创建工作流的方法。
基本上,您想要做的是具有两个以下子子项:

  1. Xcom在首先执行的子数据中推送一个列表(或以后需要创建动态工作流的内容)(请参见test1.py def return_list()
  2. 将主要dag对象作为参数传递给第二个subdag
  3. 现在,如果您有主dag对象,则可以使用它来获取其任务实例的列表。从该任务实例列表中,您可以使用parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1])过滤掉当前运行的任务,您可能会在此处添加更多过滤器。
  4. 在该任务实例中,您可以使用xcom pull通过将dag_id指定为第一个subdag之一来获取所需的值: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. 使用列表/值动态创建任务

现在,我已经在本地气流安装中对此进行了测试,并且工作正常。我不知道如果同时运行多个dag实例,xcom pull部分是否会有问题,但是您可能会使用唯一键或类似的东西来唯一地标识xcom想要的价值。可能可以将3.步骤优化为100%确保获得当前主dag的特定任务,但是对于我的使用来说,这执行得很好,我认为一个人只需要一个task_instance对象即可使用xcom_pull。

另外,在每次执行之前,我都会先清理xcoms的第一个subdag,以确保不会意外得到任何错误的值。

我很难解释,所以我希望下面的代码能使所有内容变得清晰:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag
Run Code Online (Sandbox Code Playgroud)

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag
Run Code Online (Sandbox Code Playgroud)

和主要工作流程:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
Run Code Online (Sandbox Code Playgroud)

  • 嗨,@克里斯托弗·贝克(Christopher Beck),我发现我的错误是我需要在subdags文件夹中添加_ _init_ _.py。新秀错误 (3认同)
  • 为什么这些需要放在单独的文件中?是否有必要或者可以在一个文件中创建相同的 DAG? (2认同)

Muh*_*Ali 8

我认为您正在寻找的是动态创建 DAG 我在几天前经过一番搜索后遇到了这种情况,我发现了这个博客

动态任务生成

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task
Run Code Online (Sandbox Code Playgroud)

设置 DAG 工作流程

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end
Run Code Online (Sandbox Code Playgroud)

这是我们的 DAG 将代码放在一起后的样子 在此处输入图片说明

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end
Run Code Online (Sandbox Code Playgroud)

这是非常有帮助的,希望它也能帮助其他人

  • 如果表项可能会发生变化,因此我们无法将它们放入静态 yaml 文件中怎么办? (2认同)

alw*_*mpe 8

一个很好的答案

太多了?反正。

很多其他答案都有点方钉圆孔。添加复杂的新运算符,滥用内置变量,或者在某种程度上未能回答问题。我对它们中的任何一个都不是特别满意,因为它们要么在通过 Web UI 查看时隐藏自己的行为,要么容易被破坏,要么需要大量自定义代码(这也很容易被破坏)。

该解决方案使用内置功能,不需要新的运算符和有限的附加代码,DAG 通过 UI 可见,无需任何技巧,并遵循气流最佳实践(请参阅幂等性)。

这个问题的解决方案相当复杂,所以我把它分成几个部分。这些都是:

  • 如何安全地触发动态数量的任务
  • 如何等待所有这些任务完成然后调用最终任务
  • 如何将其集成到您的任务管道中
  • 局限性(没有什么是完美的)

一个任务可以触发动态数量的其他任务吗?

是的。有点。无需编写任何新的运算符,仅使用内置运算符即可让 DAG 触发动态数量的其他 DAG。然后可以将其扩展为使 DAG 依赖于动态数量的其他 DAG(请参阅等待任务完成)。这与flinz 的解决方案类似,但更强大并且自定义代码更少。

这是使用 BranchPythonOperator 来完成的,该 BranchPythonOperator 有选择地触发 2 个其他 TriggerDagRunOperator。其中一个递归地重新调用当前 DAG,另一个调用外部 dag,即目标函数。

recursive_dag.py 的顶部给出了可用于触发 dag 的示例配置。

print_conf.py(要触发的 DAG 示例)

from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def print_output(dag_run):
    dag_conf = dag_run.conf
    if 'output' in dag_conf:
        output = dag_conf['output']
    else:
        output = 'no output found'
    print(output)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'print_output',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    print_output = PythonOperator(
        task_id='print_output_task',
        python_callable=print_output
    )
Run Code Online (Sandbox Code Playgroud)

recursive_dag.py(魔法发生的地方)

"""
DAG that can be used to trigger multiple other dags.
For example, trigger with the following config:
{
    "task_list": ["print_output","print_output"],
    "conf_list": [
        {
            "output": "Hello"
        },
        {
            "output": "world!"
        }
    ]
}
"""

from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag_id = 'branch_recursive'
branch_id = 'branch_operator'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = repeat_task_id + '_conf'
next_task_id = 'next_dag_operator'
next_task_conf = next_task_id + '_conf'

def choose_branch(task_instance, dag_run):
    dag_conf = dag_run.conf
    task_list = dag_conf['task_list']
    next_task = task_list[0]
    later_tasks = task_list[1:]
    conf_list = dag_conf['conf_list']
    # dump to string because value is stringified into
    # template string, is then parsed.
    next_conf = json.dumps(conf_list[0])
    later_confs = conf_list[1:]

    task_instance.xcom_push(key=next_task_id, value=next_task)
    task_instance.xcom_push(key=next_task_conf, value=next_conf)

    if later_tasks:
        repeat_conf = json.dumps({
            'task_list': later_tasks,
            'conf_list': later_confs
        })

        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        return [next_task_id, repeat_task_id]

    return next_task_id

def add_braces(in_string):
    return '{{' + in_string + '}}'

def make_templated_pull(key):
    pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
    return add_braces(pull)

with DAG(
    dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_branch
    )

    trigger_next = TriggerDagRunOperator(
        task_id=next_task_id,
        trigger_dag_id=make_templated_pull(next_task_id),
        conf=make_templated_pull(next_task_conf)
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=dag_id,
        conf=make_templated_pull(repeat_task_conf)
    )

    branch >> [trigger_next, trigger_repeat]
Run Code Online (Sandbox Code Playgroud)

该解决方案的优点是使用非常有限的自定义代码。flinz 的解决方案可能会中途失败,导致某些计划任务完成,而另一些则不完成。然后在重试时,DAGS 可能会被安排运行两次,或者在第一个 dag 上失败,导致失败的任务完成部分工作。此方法将告诉您哪些 DAG 触发失败,并仅重试触发失败的 DAG。因此,这种方法是幂等的,而另一种则不是。

DAG 是否可以依赖于动态数量的其他 DAGS?

是的,但是...如果任务不并行运行,这可以轻松完成。并行运行比较复杂。

要按顺序运行,重要的更改是使用wait_for_completion=Truein trigger_next,使用 python 运算符在“trigger_next”之前设置 xcom 值,并添加一个启用或禁用重复任务的分支运算符,然后具有线性依赖关系

setup_xcom >> trigger_next >> branch >> trigger_repeat
Run Code Online (Sandbox Code Playgroud)

要并行运行,您可以类似地递归链接多个使用模板external_dag_id值以及与触发的 dag 运行关联的时间戳的ExternalTask​​Sensor。要获取触发的 dag 时间戳,您可以使用触发 dag 的时间戳来触发 dag。然后这些传感器一一等待所有创建的 DAG 完成,然后触发最终的 DAG。下面的代码,这次我向打印输出 DAG 添加了随机睡眠,以便等待 dags 实际上进行一些等待。

注意:recurse_wait_dag.py 现在定义了 2 个 dags,这两个 dags 都需要启用才能正常工作。

recurse_wait_dag.py 顶部给出了可用于触发 dag 的示例配置

print_conf.py(修改为添加随机睡眠)

"""
Simple dag that prints the output in DAG config
Used to demo TriggerDagRunOperator (see recursive_dag.py)
"""

from datetime import timedelta
from time import sleep
from random import randint

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def print_output(dag_run):
    sleep_time = randint(15,30)
    print(f'sleeping for time: {sleep_time}')
    sleep(sleep_time)
    dag_conf = dag_run.conf
    if 'output' in dag_conf:
        output = dag_conf['output']
    else:
        output = 'no output found'
    print(output)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'print_output',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    print_output = PythonOperator(
        task_id='print_output_task',
        python_callable=print_output
    )
Run Code Online (Sandbox Code Playgroud)

recurse_wait_dag.py(更多魔法发生的地方)

"""
DAG that can be used to trigger multiple other dags,
waits for all dags to execute, then triggers a final dag.
For example, trigger the DAG 'recurse_then_wait' with the following config:
{
    "final_task": "print_output",
    "task_list": ["print_output","print_output"],
    "conf_list": [
        {
            "output": "Hello"
        },
        {
            "output": "world!"
        }
    ]
}
"""


from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils import timezone

from common import make_templated_pull

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

def to_conf(id):
    return f'{id}_conf'

def to_execution_date(id):
    return f'{id}_execution_date'

def to_ts(id):
    return f'{id}_ts'

recurse_dag_id = 'recurse_then_wait'
branch_id = 'recursive_branch'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = to_conf(repeat_task_id)
next_task_id = 'next_dag_operator'
next_task_conf = to_conf(next_task_id)
next_task_execution_date = to_execution_date(next_task_id)
end_task_id = 'end_task'
end_task_conf = to_conf(end_task_id)

wait_dag_id = 'wait_after_recurse'
choose_wait_id = 'choose_wait'
next_wait_id = 'next_wait'
next_wait_ts = to_ts(next_wait_id)

def choose_branch(task_instance, dag_run, ts):
    dag_conf = dag_run.conf
    task_list = dag_conf['task_list']
    next_task = task_list[0]
    # can't have multiple dag runs of same DAG with same timestamp 
    assert next_task != recurse_dag_id
    later_tasks = task_list[1:]
    conf_list = dag_conf['conf_list']
    next_conf = json.dumps(conf_list[0])
    later_confs = conf_list[1:]
    triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]

    task_instance.xcom_push(key=next_task_id, value=next_task)
    task_instance.xcom_push(key=next_task_conf, value=next_conf)
    task_instance.xcom_push(key=next_task_execution_date, value=ts)

    if later_tasks:
        repeat_conf = json.dumps({
            'task_list': later_tasks,
            'conf_list': later_confs,
            'triggered_tasks': triggered_tasks,
            'final_task': dag_conf['final_task']
        })

        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        return [next_task_id, repeat_task_id]
    
    end_conf = json.dumps({
        'tasks_to_wait': triggered_tasks,
        'final_task': dag_conf['final_task']
    })
    task_instance.xcom_push(key=end_task_conf, value=end_conf)

    return [next_task_id, end_task_id]

def choose_wait_target(task_instance, dag_run):
    dag_conf = dag_run.conf
    tasks_to_wait = dag_conf['tasks_to_wait']
    next_task, next_ts = tasks_to_wait[0]
    later_tasks = tasks_to_wait[1:]
    task_instance.xcom_push(key=next_wait_id, value=next_task)
    task_instance.xcom_push(key=next_wait_ts, value=next_ts)
    
    if later_tasks:
        repeat_conf = json.dumps({
            'tasks_to_wait': later_tasks,
            'final_task': dag_conf['final_task']
        })
        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        
def execution_date_fn(_, task_instance):
    date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
    return timezone.parse(date_str)

def choose_wait_branch(task_instance, dag_run):
    dag_conf = dag_run.conf
    tasks_to_wait = dag_conf['tasks_to_wait']

    if len(tasks_to_wait) == 1:
        return end_task_id

    return repeat_task_id

with DAG(
    recurse_dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as recursive_dag:
    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_branch
    )

    trigger_next = TriggerDagRunOperator(
        task_id=next_task_id,
        trigger_dag_id=make_templated_pull(next_task_id, branch_id),
        execution_date=make_templated_pull(next_task_execution_date, branch_id),
        conf=make_templated_pull(next_task_conf, branch_id)
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=recurse_dag_id,
        conf=make_templated_pull(repeat_task_conf, branch_id)
    )

    trigger_end = TriggerDagRunOperator(
        task_id=end_task_id,
        trigger_dag_id=wait_dag_id,
        conf=make_templated_pull(end_task_conf, branch_id)
    )

    branch >> [trigger_next, trigger_repeat, trigger_end]

with DAG(
    wait_dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as wait_dag:
    py_operator = PythonOperator(
        task_id=choose_wait_id,
        python_callable=choose_wait_target
    )

    sensor = ExternalTaskSensor(
        task_id='do_wait',
        external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
        execution_date_fn=execution_date_fn
    )

    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_wait_branch
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=wait_dag_id,
        conf=make_templated_pull(repeat_task_conf, choose_wait_id)
    )

    trigger_end = TriggerDagRunOperator(
        task_id=end_task_id,
        trigger_dag_id='{{ dag_run.conf[\'final_task\'] }}'
    )

    py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]
Run Code Online (Sandbox Code Playgroud)

与您的代码集成

太好了,但您想实际使用它。那么,你需要做什么?该问题包括一个尝试执行以下操作的示例:

             |---> Task B.1 --|
             |---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
             |       ....     |
             |---> Task B.N --|
Run Code Online (Sandbox Code Playgroud)

为了实现问题目标(下面的示例实现),您需要将任务 A、B 和 C 分成各自的 DAG。然后,在 DAG A 末尾添加一个新运算符,触发上述 DAG 'recurse_then_wait'。向此 dag 传递一个配置,其中包括每个 B DAG 所需的配置以及 B dag id(可以轻松更改以使用不同的 dag,加油)。然后包含 DAG C 的名称,即最终要运行的 DAG。此配置应如下所示:

{
    "final_task": "C_DAG",
    "task_list": ["B_DAG","B_DAG"],
    "conf_list": [
        {
            "b_number": 1,
            "more_stuff": "goes_here"
        },
        {
            "b_number": 2,
            "foo": "bar"
        }
    ]
}
Run Code Online (Sandbox Code Playgroud)

实施后,它应该看起来像这样:

触发递归.py

from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

from recurse_wait_dag import recurse_dag_id

def add_braces(in_string):
    return '{{' + in_string + '}}'

def make_templated_pull(key, task_id):
    pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
    return add_braces(pull)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

setup_trigger_conf_id = 'setup_trigger_conf'
trigger_conf_key = 'trigger_conf'

def setup_trigger_conf(task_instance):
    trigger_conf = {
        'final_task': 'print_output',
        'task_list': ['print_output','print_output'],
        'conf_list': [
            {
                'output': 'Hello'
            },
            {
                'output': 'world!'
            }
        ]
    }

    print('Triggering the following tasks')
    for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
        print(f'    task: {task} with config {json.dumps(conf)}')
    print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')

    task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))

with DAG(
    'trigger_recurse_example',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    py_operator = PythonOperator(
        task_id=setup_trigger_conf_id,
        python_callable=setup_trigger_conf
    )

    trigger_operator = TriggerDagRunOperator(
        task_id='trigger_call_and_wait',
        trigger_dag_id=recurse_dag_id,
        conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
    )

    py_operator >> trigger_operator
Run Code Online (Sandbox Code Playgroud)

所有这些最终看起来如下所示,其中垂直和水平线显示一个 DAG 触发另一个 DAG 的位置:

A
|
Recurse - B.1
|
Recurse - B.2
|
...
|
Recurse - B.N
|
Wait for B.1
|
Wait for B.2
|
...
|
Wait for B.N
|
C
Run Code Online (Sandbox Code Playgroud)

局限性

任务不再在单个图表上可见。这可能是这种方法的最大问题。通过向所有关联的 DAG 添加标签,至少可以一起查看 DAG。然而,将 DAG B 的多个并行运行与 DAG A 的运行联系起来是很混乱的。但是,由于单个 DAG 运行显示其输入配置,这意味着每个 DAG B 运行不依赖于 DAG A,仅依赖于其输入配置。因此,这种关系至少可以部分忽略。

任务无法再使用 xcom 进行通信。B 任务可以通过 DAG 配置接收来自任务 A 的输入,但是任务 C 无法从 B 任务获取输出。所有 B 任务的结果应放入已知位置,然后由任务 C 读取。

'recurse_and_wait' 的配置参数也许可以改进以组合 task_list 和 conf_list,但这解决了所述问题。

最终 DAG 没有配置。解决这个问题应该是微不足道的。


Ena*_*Ena 7

OA:“Airflow 中是否有任何方法可以创建工作流,使得任务 B.* 的数量在任务 A 完成之前是未知的?”

简短的回答是否定的。Airflow 将在开始运行之前构建 DAG 流。

也就是说,我们得出了一个简单的结论,那就是我们没有这样的需求。当您想要并行处理某些工作时,您应该评估可用的资源,而不是要处理的项目数量。

我们是这样做的:我们动态生成固定数量的任务,比如 10 个,这将拆分工作。例如,如果我们需要处理 100 个文件,每个任务将处理其中的 10 个。我将在今天晚些时候发布代码。

更新

这是代码,抱歉延迟。

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['myemail@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end
Run Code Online (Sandbox Code Playgroud)

代码说明:

这里我们有一个开始任务和一个结束任务(都是虚拟的)。

然后从带有 for 循环的 start 任务开始,我们创建了 10 个具有相同 python 可调用项的任务。任务在函数 create_dynamic_task 中创建。

我们将并行任务的总数和当前任务索引作为参数传递给每个 python 可调用对象。

假设您有 1000 个项目要详细说明:第一个任务将收到输入,它应该详细说明 10 个块中的第一个块。它将 1000 个项目分成 10 个块并详细说明第一个。

  • @AnthonyKeane 这是您应该调用的python 函数来实际执行某些操作。正如代码中所评论的,它将把总数和当前数字作为输入来详细说明总元素的块。 (2认同)

rot*_*ten 6

作业图不是在运行时生成的。相反,当 Airflow 从 dags 文件夹中拾取图表时,就会构建该图表。因此,每次运行作业时实际上都不可能有不同的图表。您可以配置作业以在加载时基于查询构建图表。此后的每次运行该图都将保持不变,这可能不是很有用。

您可以设计一个图表,使用分支运算符根据查询结果在每次运行时执行不同的任务。

我所做的是预先配置一组任务,然后获取查询结果并将它们分布在任务中。无论如何,这可能更好,因为如果您的查询返回大量结果,您可能不想让调度程序充满大量并发任务。为了更安全,我还使用了一个池来确保并发性不会因意外的大查询而失控。

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################
Run Code Online (Sandbox Code Playgroud)


Kyl*_*ine 5

是的,这是可能的,我创建了一个示例DAG来演示这一点。

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)
Run Code Online (Sandbox Code Playgroud)

在运行DAG之前,请创建以下三个气流变量

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0
Run Code Online (Sandbox Code Playgroud)

您会看到DAG与此不同

在此处输入图片说明

运行后对此

在此处输入图片说明

您可以在我的有关在气流上创建动态工作流的文章中看到有关此DAG的更多信息。

  • 但是如果你有多个这个 DAG 的 DagRun 会发生什么。他们都共享相同的变量吗? (2认同)
  • 是的,他们会使用相同的变量;我在最后的文章中解决了这个问题。您需要动态创建变量并在变量名称中使用 dag 运行 ID。我的示例很简单,只是为了演示动态可能性,但您需要使其具有生产质量:) (2认同)
  • @jvans 谢谢,它很聪明,但可能不是生产质量 (2认同)
  • 好主意!我发现这个框架很有用,但我从你的评论中受益,凯尔。因此,当我需要根据本地未保存的信息动态创建任务时,我首先使用运算符从(在我的例子中)S3 获取该信息并设置气流变量。然后我可以使用该变量来设置动态 dags,并且如果远程存储发生更改,仍然依赖它进行更新。这非常好,因为它消除了每次调度程序刷新 DAG 列表时运行更复杂的顶级代码的开销。感谢您在这里进行有用的讨论! (2认同)