使用TriggerDagRunOperator多次运行另一个DAG

yee*_*379 8 call directed-acyclic-graphs airflow apache-airflow

我有一个DAG(DAG1),我复制了一堆文件.然后,我想为每个复制的文件启动另一个DAG(DAG2).由于复制的文件数量因DAG1运行而异,我希望基本上循环遍历文件并使用适当的参数调用DAG2.

例如:

with DAG( 'DAG1',
        description="copy files over",
        schedule_interval="* * * * *",
        max_active_runs=1
    ) as dag:


    t_rsync = RsyncOperator( task_id='rsync_data',
        source='/source/',
        target='/destination/' )

    t_trigger_preprocessing = TriggerDagRunOperator( task_id='trigger_preprocessing',
        trigger_daq_id='DAG2',
        python_callable=trigger

    )

    t_rsync >> t_trigger_preprocessing
Run Code Online (Sandbox Code Playgroud)

我希望使用python_callable trigger从中提取相关的xcom数据t_rsync,然后触发DAG2; 但我不清楚如何做到这一点.

我更愿意在这里调用DAG2的逻辑来简化DAG2的内容(并提供堆栈原理图max_active_runs)

yee*_*379 7

最后写了我自己的运算符:

class TriggerMultipleDagRunOperator(TriggerDagRunOperator):
    def execute(self, context):
        count = 0
        for dro in self.python_callable(context):
            if dro:
                with create_session() as session:
                    dbag = DagBag(settings.DAGS_FOLDER)
                    trigger_dag = dbag.get_dag(self.trigger_dag_id)
                    dr = trigger_dag.create_dagrun(
                        run_id=dro.run_id,
                        state=State.RUNNING,
                        conf=dro.payload,
                        external_trigger=True)
                    session.add(dr)
                    session.commit() 
                    count = count + 1
            else:
                self.log.info("Criteria not met, moving on")
        if count == 0:
            raise AirflowSkipException('No external dags triggered')
Run Code Online (Sandbox Code Playgroud)

用python_callable之类的

def trigger_preprocessing(context):
    for base_filename,_ in found.items():
        exp = context['ti'].xcom_pull( task_ids='parse_config', key='experiment')
        run_id='%s__%s' % (exp['microscope'], datetime.utcnow().replace(microsecond=0).isoformat())
        dro = DagRunOrder(run_id=run_id) 
        d = { 
            'directory': context['ti'].xcom_pull( task_ids='parse_config', key='experiment_directory'),
            'base': base_filename,
            'experiment': exp['name'],
        }
        LOG.info('triggering dag %s with %s' % (run_id,d))
        dro.payload = d
        yield dro
    return
Run Code Online (Sandbox Code Playgroud)

然后把它们绑在一起:

t_trigger_preprocessing = TriggerMultipleDagRunOperator( task_id='trigger_preprocessing',
    trigger_dag_id='preprocessing',
    python_callable=trigger_preprocessing
)
Run Code Online (Sandbox Code Playgroud)