Ash*_*n K 5 directed-acyclic-graphs airflow
我有一个场景,一个特定的dag完成后需要触发多个dag,是否使用过TriggerDagRunOperator来触发单个dag,是否可以将多个dag传递给TriggerDagRunOperator来触发多个dag?
并且有可能仅在成功完成当前dag时触发。
我也遇到过同样的问题。并没有开箱即用的解决方案,但是我们可以为其编写一个自定义运算符。
因此,此处为get python_callable和trigger_dag_idas参数的自定义运算符的代码:
class TriggerMultiDagRunOperator(TriggerDagRunOperator):
@apply_defaults
def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
def execute(self, context):
session = settings.Session()
created = False
for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
if not dro or not isinstance(dro, DagRunOrder):
break
if dro.run_id is None:
dro.run_id = 'trig__' + datetime.utcnow().isoformat()
dbag = DagBag(settings.DAGS_FOLDER)
trigger_dag = dbag.get_dag(self.trigger_dag_id)
dr = trigger_dag.create_dagrun(
run_id=dro.run_id,
state=State.RUNNING,
conf=dro.payload,
external_trigger=True
)
created = True
self.log.info("Creating DagRun %s", dr)
if created is True:
session.commit()
else:
self.log.info("No DagRun created")
session.close()
Run Code Online (Sandbox Code Playgroud)
trigger_dag_id 是dag id我们想要多次运行。
python_callable是一个函数,它应该返回一个DagRunOrder对象列表,一个对象用于调度带有dag_id的DAG的一个实例trigger_dag_id。
GitHub上的代码和示例:https : //github.com/mastak/airflow_multi_dagrun 关于此代码的更多描述:https : //medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13
| 归档时间: |
|
| 查看次数: |
3360 次 |
| 最近记录: |