如何动态嵌套Airflow DAG?

Pav*_*vel 5 airflow

我有一个由三个运算符组成的简单DAG。第一个是PythonOperator我们自己的功能,另外两个是标准的运营商从airflow.contribFileToGoogleCloudStorageOperatorGoogleCloudStorageToBigQueryOperator要准确)。它们按顺序工作。根据参数,我们的自定义任务会生成许多文件,通常在2到5之间。所有这些文件都必须由后续任务分别处理。这意味着我想要几个下游分支,但在运行DAG之前确切知道有多少个分支。

您将如何解决这个问题?

更新:

使用BranchPythonOperator他在另一个答复中提到的jhnclvr 作为出发点,我创建了一个根据条件跳过或继续执行分支的运算符。该方法仅是可行的,因为已知最大可能的分支数并且足够小。

运营商:

class SkipOperator(PythonOperator):
    def execute(self, context):
        boolean = super(SkipOperator, self).execute(context)
        session = settings.Session()
        for task in context['task'].downstream_list:
            if boolean is False:
                ti = TaskInstance(
                    task, execution_date=context['ti'].execution_date)
                ti.state = State.SKIPPED
                ti.start_date = datetime.now()
                ti.end_date = datetime.now()
                session.merge(ti)
        session.commit()
        session.close()
Run Code Online (Sandbox Code Playgroud)

用法:

def check(i, templates_dict=None, **kwargs):
    return len(templates_dict["data_list"].split(",")) > i

dag = DAG(
    dag_name,
    default_args=default_args,
    schedule_interval=None
)

load = CustomOperator(
    task_id="load_op",
    bash_command=' '.join([
        './command.sh'
        '--data-list {{ dag_run.conf["data_list"]|join(",") }}'
    ]),
    dag=dag
)

for i in range(0, 5):
    condition = SkipOperator(
        task_id=f"{dag_name}_condition_{i}",
        python_callable=partial(check, i),
        provide_context=True,
        templates_dict={"data_list": '{{ dag_run.conf["data_list"]|join(",") }}'},
        dag=dag
    )
    gs_filename = 'prefix_{{ dag_run.conf["data_list"][%d] }}.json' % i

    load_to_gcs = CustomFileToGoogleCloudStorageOperator(
        task_id=f"{dag_name}_to_gs_{i}",
        src='/tmp/{{ run_id }}_%d.{{ dag_run.conf["file_extension"] }}' % i,
        bucket=gs_bucket,
        dst=gs_filename,
        mime_type='application/json',
        google_cloud_storage_conn_id=connection_id,
        dag=dag
    )
    load_to_bq = GoogleCloudStorageToBigQueryOperator(
        task_id=f"{dag_name}_to_bq_{i}",
        bucket=gs_bucket,
        source_objects=[gs_filename, ],
        source_format='NEWLINE_DELIMITED_JSON',
        destination_project_dataset_table='myproject.temp_{{ dag_run.conf["data_list"][%d] }}' % i,
        bigquery_conn_id=connection_id,
        schema_fields={},
        google_cloud_storage_conn_id=connection_id,
        write_disposition='WRITE_TRUNCATE',
        dag=dag
    )

    condition.set_upstream(load)
    load_to_gcs.set_upstream(condition)
    load_to_bq.set_upstream(load_to_gcs)
Run Code Online (Sandbox Code Playgroud)

jhn*_*lvr 5

在这里看到类似(但不同)的问题

基本上,您无法在任务DAG运行时为其添加任务。您需要提前知道要添加多少个任务。

您可以使用一个运算符处理N个文件。

或者,如果您还有另一个处理文件的dag,则可以触发该DAGN次,在conf中传递文件名。

请参阅此处的示例TriggerDagRunOperator

请参阅此处了解DAG将会触发的。

最后,看看上面示例来自的帖子。