Pra*_*ngh 7 airflow apache-airflow airflow-scheduler
我正在尝试运行气流DAG并需要为任务传递一些参数.
如何trigger_dag在python DAG文件中读取命令行命令中作为--conf参数传递的JSON字符串.
例如: airflow trigger_dag 'dag_name' -r 'run_id' --conf '{"key":"value"}'
Dan*_*ang 12
两种方式.从模板字段或文件中:
{{ dag_run.conf['key'] }}
Run Code Online (Sandbox Code Playgroud)
或者当上下文可用时,例如在python中可调用的PythonOperator:
context['dag_run'].conf['key']
Run Code Online (Sandbox Code Playgroud)
在此处提供的示例中https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py#L62在尝试解析气流 REST API 调用中传递的“conf”时,provide_context=True在 pythonOperator 中使用。
此外,在 REST API 调用中以 json 格式传递的键值对,可以在 bashOperator 和 SparkOperator 中访问,如下所示'\'{{ dag_run.conf["key"] if dag_run else "" }}\''
dag = DAG(
dag_id="example_dag",
default_args={"start_date": days_ago(2), "owner": "airflow"},
schedule_interval=None
)
def run_this_func(**context):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param context: The execution context
:type context: dict
"""
print("context", context)
print("Remotely received value of {} for key=message".format(context["dag_run"].conf["key"]))
#PythonOperator usage
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag, provide_context=True)
#BashOperator usage
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: \'{{ dag_run.conf["key"] if dag_run else "" }}\'"',
dag=dag
)
#SparkSubmitOperator usage
spark_task = SparkSubmitOperator(
task_id="task_id",
conn_id=spark_conn_id,
name="task_name",
application="example.py",
application_args=[
'--key', '\'{{ dag_run.conf["key"] if dag_run else "" }}\''
],
num_executors=10,
executor_cores=5,
executor_memory='30G',
#driver_memory='2G',
conf={'spark.yarn.maxAppAttempts': 1},
dag=dag)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5520 次 |
| 最近记录: |