Airflow DAG 状态为成功,但任务状态 Dag 尚未运行

SAT*_*JEE 1 airflow

我正在使用气流 2.3.4 ;我用配置触发。当我对配置值进行硬编码时,该 DAG 成功运行。但是在传递配置后触发时,我的任务永远不会启动,但状态变为绿色(成功)。请帮助我了解出了什么问题!

from datetime import datetime, timedelta
from airflow import DAG
from pprint import pprint
from airflow.operators.python import PythonOperator
from operators.jvm import JVMOperator

args = {
    'owner': 'satyam',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag_params = {
    'dag_id': 'synthea_etl_end_to_end_with_config',
    'start_date': datetime.utcnow(),
    'end_date': datetime(2025, 2, 5),
    'default_args': args,
    'schedule_interval': timedelta(hours=4)
}


dag = DAG(**dag_params)

# [START howto_operator_python]


def print_context(ds, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    pprint(ds)
    return 'Whatever you return gets printed in the logs'


jvm_task = JVMOperator(
    task_id='jvm_task',
    correlation_id='123456',
    jar='/home/i1136/Synthea/synthea-with-dependencies.jar',
    options={ 
    'java_args': [''],
    'jar_args': ["-p {{ dag_run.conf['population_count'] }} --exporter.fhir.export {{ dag_run.conf['fhir'] }} --exporter.ccda.export {{ dag_run.conf['ccda'] }} --exporter.csv.export {{ dag_run.conf['csv'] }} --exporter.csv.append_mode {{ dag_run.conf['csv'] }} --exporter.baseDirectory /home/i1136/Synthea/output_dag_config" ]
    })

print_context_task = PythonOperator(task_id='print_context_task', provide_context=True, python_callable=print_context, dag=dag)

jvm_task.set_downstream(print_context_task)
Run Code Online (Sandbox Code Playgroud)

小智 7

问题在于'start_date': datetime.utcnow(),它总是 >= dag_run start_date,在这种情况下,Airflow 会将运行标记为成功而不运行它。对于这个变量,最好选择运行的最小日期,如果没有,可以使用昨天的日期,但第二天你将无法重新运行前一天失败的任务:

import pendulum
dag_params = {
    ...,
    'start_date': pendulum.yesterday(),
    ...,
}
Run Code Online (Sandbox Code Playgroud)