Sub*_*ham 7 python python-3.x airflow airflow-scheduler
我在运行气流 DAG 时遇到问题。有时我在日志中收到以下消息,并且 DAG 停止执行,但有时它工作正常。我尝试用谷歌搜索该错误,但没有找到任何内容。以下是我的日志。
[2020-06-17 14:50:06,983] {__init__.py:1133} INFO - Dependencies not met for <TaskInstance: feeds__my_feed.my_feed_feed_processor 2020-06-16T12:13:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
[2020-06-17 14:50:06,984] {__init__.py:1133} INFO - Dependencies not met for <TaskInstance: feeds__my_feed.my_feed_feed_processor 2020-06-16T12:13:00+00:00 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2020-06-17 12:13:32.150826+00:00.
[2020-06-17 14:50:06,987] {logging_mixin.py:95} INFO - [2020-06-17 14:50:06,987] {jobs.py:2549} INFO - Task is not able to be run
Run Code Online (Sandbox Code Playgroud)
有关更多详细信息,我检查了{jobs.py:2549}
IN AIRFLOW 源代码并发现了以下内容。我不确定为什么它会在中间停止执行。请帮我。
def _execute(self):
self.task_runner = get_task_runner(self)
def signal_handler(signum, frame):
"""Setting kill signal handler"""
self.log.error("Received SIGTERM. Terminating subprocesses")
self.on_kill()
raise AirflowException("LocalTaskJob received SIGTERM signal")
signal.signal(signal.SIGTERM, signal_handler)
if not self.task_instance._check_and_change_state_before_execution(
mark_success=self.mark_success,
ignore_all_deps=self.ignore_all_deps,
ignore_depends_on_past=self.ignore_depends_on_past,
ignore_task_deps=self.ignore_task_deps,
ignore_ti_state=self.ignore_ti_state,
job_id=self.id,
pool=self.pool):
self.log.info("Task is not able to be run")
return
Run Code Online (Sandbox Code Playgroud)
以下是 DAG 详细信息
{'dag': <DAG: feeds__my_feed>, 'ds': '2020-06-16', 'next_ds': '2020-06-17', 'next_ds_nodash': '20200617', 'prev_ds': '2020-06-15', 'prev_ds_nodash': '20200615', 'ds_nodash': '20200616', 'ts': '2020-06-16T12:13:00+00:00', 'ts_nodash': '20200616T121300', 'ts_nodash_with_tz': '20200616T121300+0000', 'yesterday_ds': '2020-06-15', 'yesterday_ds_nodash': '20200615', 'tomorrow_ds': '2020-06-17', 'tomorrow_ds_nodash': '20200617', 'END_DATE': '2020-06-16', 'end_date': '2020-06-16', 'dag_run': <DagRun feeds__my_feed @ 2020-06-16 12:13:00+00:00: scheduled__2020-06-16T12:13:00+00:00, externally triggered: False>, 'run_id': 'scheduled__2020-06-16T12:13:00+00:00', 'execution_date': <Pendulum [2020-06-16T12:13:00+00:00]>, 'prev_execution_date': datetime.datetime(2020, 6, 15, 12, 13, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'next_execution_date': datetime.datetime(2020, 6, 17, 12, 13, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'latest_date': '2020-06-16', 'macros': <module 'airflow.macros' from '/root/.local/share/virtualenvs/app-4PlAip0Q/lib/python3.6/site-packages/airflow/macros/__init__.py'>, 'params': {}, 'tables': None, 'task': <Task(PythonOperator): my_feed_feed_processor>, 'task_instance': <TaskInstance: feeds__my_feed.my_feed_processor 2020-06-16T12:13:00+00:00 [running]>, 'ti': <TaskInstance: feeds__my_feed.my_feed_processor 2020-06-16T12:13:00+00:00 [running]>, 'task_instance_key_str': 'feeds__my_feed_feed_processor__20200616', 'conf': <module 'airflow.configuration' from '/root/.local/share/virtualenvs/app-4PlAip0Q/lib/python3.6/site-packages/airflow/configuration.py'>, 'test_mode': False, 'var': {'value': None, 'json': None}, 'inlets': [], 'outlets': [],'task_id': 'my_feed_processor', 'templates_dict': None}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6627 次 |
最近记录: |