如何根据上游任务返回的列表动态生成任务。
我尝试了以下方法:
使用外部文件从列表中写入和读取 - 此选项有效,但我正在寻找更优雅的解决方案。
Xcom 拉进一个 subdag 工厂。这是行不通的。我能够将一个列表从上游任务传递给一个 subdag,但是 xcom 只能在 subdag 的任务内部访问,并且不能用于循环/迭代返回的列表并生成任务。例如 subdag 工厂方法。
def subdag1(parent_dag_name, child_dag_name, default_args,**kwargs):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=default_args,
schedule_interval="@once",
)
list_files='{{ task_instance.xcom_pull( dag_id="qqq",task_ids="push")}}'
for newtask in list_files:
BashOperator(
task_id='%s-task-%s' % (child_dag_name, 'a'),
default_args=default_args,
bash_command= 'echo '+ list_files + newtask,
dag=dag_subdag,
)
return dag_subdag
Run Code Online (Sandbox Code Playgroud) 我的想法是有一个foo生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。
这里唯一的变量是生成的任务数量。在完成所有这些任务后,我想再做一些任务,因此为每个任务启动一个新的 DAG 似乎并不合适。
这是我的代码:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}
dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)
foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)
def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))
for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)
op.set_downstream(bar_operator)
op.set_upstream(dummy_op)
gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)
gen_subdag_node_op.set_upstream(foo_operator)
dummy_op …Run Code Online (Sandbox Code Playgroud) 我有一个 DAG,它是通过查询 DynamoDB 的列表创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。在下面的示例中未显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用它set_upstream来强制执行依赖关系。
- airflow_home
\- dags
\- workflow.py
Run Code Online (Sandbox Code Playgroud)
工作流.py
def get_task_list():
# ... query dynamodb ...
def run_task(task):
# ... do stuff ...
dag = DAG(dag_id='my_dag', ...)
tasks = get_task_list()
for task in tasks:
t = PythonOperator(
task_id=task['id'],
provide_context=False,
dag=dag,
python_callable=run_task,
op_args=[task]
)
Run Code Online (Sandbox Code Playgroud)
问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法受到 AWS 的限制并抛出异常。
我认为这是因为无论何时run_task()被调用,它都会运行所有全局变量,workflow.py所以我尝试run_task()进入一个单独的模块,如下所示:
- airflow_home
\- dags
\- workflow.py
\- mypackage
\- __init__
\- task.py
Run Code Online (Sandbox Code Playgroud)
但这并没有改变任何事情。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为方式仍然相同。 …