气流:为 TriggerDagRunOperator 设置自定义 run_id

Dav*_*ler 5 airflow

当使用 TriggerDagRunOperator 触发另一个 DAG 时,它只给出一个通用名称,如 trig_timestamp:

在此处输入图片说明

是否可以为此运行 ID 指定一个有意义的名称,以便我可以轻松识别不同的 dag 运行?

Pir*_*jas 5

您不能立即使用 执行此操作,因为TriggerDagOperator“run_id”是在其执行方法内生成的。但是,您可以实现自己的运算符,CustomTriggerDagOperator它会按照您想要/需要的方式运行。例如:

class CustomTriggerDagOperator(TriggerDagOperator):
    def execute(self, context):
        if self.execution_date is not None:
            run_id = 'trig__{}'.format(self.execution_date)
            self.execution_date = timezone.parse(self.execution_date)
        else:
            run_id = 'trig__' + timezone.utcnow().isoformat()

        run_id += f'{self.trigger_dag_id}'

        dro = DagRunOrder(run_id=run_id)
        if self.python_callable is not None:
            dro = self.python_callable(context, dro)
        if dro:
            trigger_dag(dag_id=self.trigger_dag_id,
                        run_id=dro.run_id,
                        conf=json.dumps(dro.payload),
                        execution_date=self.execution_date,
                        replace_microseconds=False)
        else:
            self.log.info("Criteria not met, moving on")
Run Code Online (Sandbox Code Playgroud)

上面的示例只是附加了触发的 dag 的 id。您可以使用相同的策略来任意设置 run_id。