当使用 TriggerDagRunOperator 触发另一个 DAG 时,它只给出一个通用名称,如 trig_timestamp:
是否可以为此运行 ID 指定一个有意义的名称,以便我可以轻松识别不同的 dag 运行?
您不能立即使用 执行此操作,因为TriggerDagOperator“run_id”是在其执行方法内生成的。但是,您可以实现自己的运算符,CustomTriggerDagOperator它会按照您想要/需要的方式运行。例如:
class CustomTriggerDagOperator(TriggerDagOperator):
def execute(self, context):
if self.execution_date is not None:
run_id = 'trig__{}'.format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date)
else:
run_id = 'trig__' + timezone.utcnow().isoformat()
run_id += f'{self.trigger_dag_id}'
dro = DagRunOrder(run_id=run_id)
if self.python_callable is not None:
dro = self.python_callable(context, dro)
if dro:
trigger_dag(dag_id=self.trigger_dag_id,
run_id=dro.run_id,
conf=json.dumps(dro.payload),
execution_date=self.execution_date,
replace_microseconds=False)
else:
self.log.info("Criteria not met, moving on")
Run Code Online (Sandbox Code Playgroud)
上面的示例只是附加了触发的 dag 的 id。您可以使用相同的策略来任意设置 run_id。
| 归档时间: |
|
| 查看次数: |
1908 次 |
| 最近记录: |