我正在使用气流 2.3.4 ;我用配置触发。当我对配置值进行硬编码时,该 DAG 成功运行。但是在传递配置后触发时,我的任务永远不会启动,但状态变为绿色(成功)。请帮助我了解出了什么问题!
from datetime import datetime, timedelta
from airflow import DAG
from pprint import pprint
from airflow.operators.python import PythonOperator
from operators.jvm import JVMOperator
args = {
'owner': 'satyam',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag_params = {
'dag_id': 'synthea_etl_end_to_end_with_config',
'start_date': datetime.utcnow(),
'end_date': datetime(2025, 2, 5),
'default_args': args,
'schedule_interval': timedelta(hours=4)
}
dag = DAG(**dag_params)
# [START howto_operator_python]
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
pprint(ds)
return 'Whatever you return gets printed in the logs'
jvm_task = JVMOperator(
task_id='jvm_task',
correlation_id='123456',
jar='/home/i1136/Synthea/synthea-with-dependencies.jar',
options={
'java_args': [''],
'jar_args': ["-p {{ dag_run.conf['population_count'] }} --exporter.fhir.export {{ dag_run.conf['fhir'] }} --exporter.ccda.export {{ dag_run.conf['ccda'] }} --exporter.csv.export {{ dag_run.conf['csv'] }} --exporter.csv.append_mode {{ dag_run.conf['csv'] }} --exporter.baseDirectory /home/i1136/Synthea/output_dag_config" ]
})
print_context_task = PythonOperator(task_id='print_context_task', provide_context=True, python_callable=print_context, dag=dag)
jvm_task.set_downstream(print_context_task)
Run Code Online (Sandbox Code Playgroud)
小智 7
问题在于'start_date': datetime.utcnow(),它总是 >= dag_run start_date,在这种情况下,Airflow 会将运行标记为成功而不运行它。对于这个变量,最好选择运行的最小日期,如果没有,可以使用昨天的日期,但第二天你将无法重新运行前一天失败的任务:
import pendulum
dag_params = {
...,
'start_date': pendulum.yesterday(),
...,
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2722 次 |
| 最近记录: |