我有一个超级简单的测试DAG,看起来像这样:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
DAG = DAG(
dag_id='scheduler_test_dag',
start_date=datetime(2017, 9, 9, 4, 0, 0, 0), #..EC2 time. Equal to 11pm hora México
max_active_runs=1,
schedule_interval='@once' #externally triggered
)
def ticker_function():
with open('/tmp/ticker', 'a') as outfile:
outfile.write('{}\n'.format(datetime.now()))
time_ticker = PythonOperator(
task_id='time_ticker',
python_callable=ticker_function,
dag=DAG
)
Run Code Online (Sandbox Code Playgroud)
自升级到apache-airflowv1.9 以来,此DAG已挂起且无法运行.深入研究调度程序日志,我发现了错误跟踪:
[2018-02-12 17:03:06,259] {jobs.py:1754} INFO - DAG(s) dict_keys(['scheduler_test_dag']) retrieved from /home/ubuntu/airflow/dags/scheduler_test_dag.py
[2018-02-12 17:03:06,315] {jobs.py:1386} INFO - Processing scheduler_test_dag
[2018-02-12 17:03:06,320] {jobs.py:379} ERROR - Got an exception! Propagating...
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 371, in helper
pickle_dags)
File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 1792, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 1388, in _process_dags
dag_run = self.create_dag_run(dag)
File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/airflow/jobs.py", line 807, in create_dag_run
if next_start <= now:
TypeError: unorderable types: NoneType() <= datetime.datetime()
Run Code Online (Sandbox Code Playgroud)
这个错误来自哪里?我唯一可以想到的是,scheduler_interval='@once'自从v1.9升级以来,这个DAG与我服务器上另一个破坏的DAG有一个共同之处.否则它是有史以来最基本的DAG - 似乎不应该有问题.以前我在切换到apache-airflowrepo 之前使用了基本的pip安装.
这是Web UI的屏幕截图.一切似乎都正常工作,除了顶部和底部DAGS,其调度间隔设置为@once并且无限期挂起:
有什么想法吗?