Vat*_*iya 6 python workflow directed-acyclic-graphs airflow
我想使用一个脚本生成多个气流 dag。dag 名称应为“test_parameter”。下面是我的脚本:
from datetime import datetime
# Importing Airflow modules
from airflow.models import DAG
from airflow.operators import DummyOperator
# Specifying the default arguments for the DAG
default_args = {
'owner': 'Test',
'start_date': datetime.now()
}
parameter_list = ["abc", "pqr", "xyz"]
for parameter in parameter_list:
dag = DAG("test_"+parameter,
default_args=default_args,
schedule_interval=None)
dag.doc_md = "This is a test dag"
# Creating Start Dummy Operator
start = DummyOperator(
task_id="start",
dag=dag)
# Creating End Dummy Operator
end = DummyOperator(
task_id="end",
dag=dag)
# Design workflow of tasks in the dag
end.set_upstream(start)
Run Code Online (Sandbox Code Playgroud)
所以在这种情况下,它应该创建 3 个 dag:“test_abc”、“test_pqr”和“test_xyz”。
但是在运行脚本时,它只会创建一个 dag“test_xyz”。有关如何解决此问题的任何见解。提前致谢 :)
是的,这是可能的,您可以将每个 DAG 的配置保存在存储中。例如,您可以将配置保存在持久存储 (DB) 中,然后获取配置并将结果保存在缓存中。这样做主要是因为我们想防止每次 DAG 脚本刷新时 dag 脚本从数据库中获取配置。因此,我们使用缓存并保存其过期时间。你可以参考这篇关于如何创建动态 DAG 的文章
for i in range(10):
dag_id = 'foo_{}'.format(i)
globals()[dag_id] = DAG(dag_id)
Run Code Online (Sandbox Code Playgroud)
反过来,您还希望创建动态子 DAG 和动态任务。希望能帮助到你 :-)
小智 2
我猜问题是 dag 对象“start”和“end”被 forloop 覆盖,因此只保留最后一个值。
奇怪的是,虽然不能动态创建dag,但是可以通过循环动态创建任务。也许这有帮助。
for i in range(3):
t1 = BashOperator(
task_id='Success_test'+str(i),
bash_command='cd home',
dag=dag)
slack_notification.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5626 次 |
| 最近记录: |