气流运行失败,但气流测试有效。不知道为什么

shw*_*ill 5 airflow airflow-scheduler

因此,在提出这个问题之前,我浏览了文档并查看了Airflow 中“气流运行”和“气流测试”之间的差异,看看我是否能找出为什么会出现这个问题。

我有一些 dags,它们都使用LocalExecutor. 其中两个使用 an SSHOperator,另一个仅在本地运行。我已经尝试过了airflow run <some_task_id> <some_execution_date>airflow trigger_dag <dag_id>但都失败了。但是,当我运行时airflow test <task_id> <execution_date>它有效。我应该强调的是,对于仅具有本地运行任务的 dag 来说也是如此。

start_date围绕in dags 以及它与 的关系似乎存在很多混乱schedule_interval。我所有的 dags 都有一个静态 start_date 设置为最近的时间,为了理智起见,我有计划间隔,schedule_interval='* * * * *'以便它每分钟运行一次(这是一个轻量级任务)。当运行 dag 时,如果重试次数设置为 0,则任务只会运行并失败,或者如果重试次数> 0,则任务会陷入重试状态,每分钟都没有太多反馈。对于每种情况,我得到的任务实例详细信息是:

Task instance's dagrun did not exist: Unknown reason.

或者

Task is not ready for retry yet but will be retried automatically. Current date is 2019-02-20T12:30:35.381668+00:00 and task will be retried at 2019-02-20T12:31:21.492310+00:00.

尽管在配置文件中指定了日志的位置,并且它告诉我它们来自任务实例详细信息页面的位置,但 UI 中没有日志。

以下是其中一个 dags 的示例:

import json
import re
from os.path import expanduser
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import timezone
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_operator import SSHOperator
from bw_config_tools.connect.bw_config import ConfigDbClient

CONFIG_DB_INFO = '/etc/airflow/config_db_info.json'
START_SCRIPT = '/bin/start.sh'
TIME_IN_PAST = timezone.convert_to_utc(datetime(2019, 2, 14, 15, 00))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': TIME_IN_PAST,
    'email': ['example@domain.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1),
}

def _extract_instance_id(instance_string):
    return re.findall(r'\d+', instance_string)[0]

def _read_file_as_json(file_name):
    with open(file_name) as open_file:
        return json.load(open_file)

DB_INFO = _read_file_as_json(CONFIG_DB_INFO)
CONFIG_CLIENT = ConfigDbClient(**DB_INFO)

print('Config DB client: {0}'.format(CONFIG_CLIENT))

APP_DIRS = CONFIG_CLIENT.get_values('%solr-mentions-cleanup.[0-9]+.dir%', strictness='similar')

INSTANCE_START_SCRIPT_PATHS = {
    _extract_instance_id(instance_string): directory+START_SCRIPT
    for instance_string, directory in APP_DIRS.items()
    }

# Create an ssh hook which refers to pre-existing connection information
# setup and stored by airflow
SSH_HOOK = SSHHook(ssh_conn_id='solr-mentions-cleanups', key_file='/home/airflow/.ssh/id_rsa')

# Create a DAG object to add tasks to
DAG = DAG('solr-mentions-cleanups',
          default_args=DEFAULT_ARGS,
          schedule_interval='* * * * *'
          )

DAG.catchup = False

# Create a task for each solr-mentions-cleanup instance.
for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
    task = SSHOperator(
        task_id='run-solr-mentions-cleanups-{0}'.format(instance_id),
        command='bash {0}  disabled-queries --delete'.format(start_script),
        ssh_hook=SSH_HOOK,
        dag=DAG)
Run Code Online (Sandbox Code Playgroud)

这是运行任务的输出:

(venv) airflow@some_host ~ # airflow run solr-mentions-cleanups run-solr-mentions-cleanups-0 2019-02-14
[2019-02-20 12:38:51,313] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16375)
[2019-02-20 12:38:51,313] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16375
[2019-02-20 12:38:51,491] {__init__.py:42} DEBUG - Cannot import  due to  doesn't look like a module path
[2019-02-20 12:38:51,645] {__init__.py:51} INFO - Using executor LocalExecutor
[2019-02-20 12:38:51,654] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f0364fdc8c8> to pre execution callback
[2019-02-20 12:38:51,930] {cli_action_loggers.py:64} DEBUG - Calling callbacks: [<function default_action_log at 0x7f0364fdc8c8>]
[2019-02-20 12:38:51,974] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16375)
[2019-02-20 12:38:51,974] {settings.py:154} DEBUG - settings.configure_orm(): Using NullPool
[2019-02-20 12:38:51,976] {models.py:273} INFO - Filling up the DagBag from /etc/airflow/dags
[2019-02-20 12:38:51,978] {models.py:360} INFO - File /etc/airflow/dags/__init__.py assumed to contain no DAGs. Skipping.
[2019-02-20 12:38:51,978] {models.py:363} DEBUG - Importing /etc/airflow/dags/hbase-exports.py
[2019-02-20 12:38:51,983] {models.py:501} DEBUG - Loaded DAG <DAG: hbase-daily-export>
[2019-02-20 12:38:51,984] {models.py:363} DEBUG - Importing /etc/airflow/dags/test_dag.py
[2019-02-20 12:38:51,985] {models.py:501} DEBUG - Loaded DAG <DAG: test_dag>
[2019-02-20 12:38:51,986] {models.py:363} DEBUG - Importing /etc/airflow/dags/solr_mentions_cleanup.py
Creating dag
Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f032739b4e0>
The key file given is /home/airflow/.ssh/id_rsa
[2019-02-20 12:38:52,196] {base_hook.py:83} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
extra connection info given:
Key file in extra options:  None
SSH config file being used is  /home/airflow/.ssh/config
[2019-02-20 12:38:52,198] {models.py:501} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
[2019-02-20 12:38:52,251] {cli.py:520} INFO - Running <TaskInstance: solr-mentions-cleanups.run-solr-mentions-cleanups-0 2019-02-14T00:00:00+00:00 [success]> on host xxxxxx.net
[2019-02-20 12:38:54,026] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16453)
[2019-02-20 12:38:54,027] {settings.py:174} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=3600, pid=16453
[2019-02-20 12:38:54,207] {__init__.py:42} DEBUG - Cannot import  due to  doesn't look like a module path
[2019-02-20 12:38:54,362] {__init__.py:51} INFO - Using executor LocalExecutor
[2019-02-20 12:38:54,371] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f4345cfa8c8> to pre execution callback
[2019-02-20 12:38:54,622] {cli_action_loggers.py:64} DEBUG - Calling callbacks: [<function default_action_log at 0x7f4345cfa8c8>]
[2019-02-20 12:38:54,658] {settings.py:146} DEBUG - Setting up DB connection pool (PID 16453)
[2019-02-20 12:38:54,658] {settings.py:154} DEBUG - settings.configure_orm(): Using NullPool
[2019-02-20 12:38:54,660] {models.py:273} INFO - Filling up the DagBag from /etc/airflow/dags/solr_mentions_cleanup.py
[2019-02-20 12:38:54,662] {models.py:363} DEBUG - Importing /etc/airflow/dags/solr_mentions_cleanup.py

Config DB client: <bw_config_tools.connect.bw_config.ConfigDbClient object at 0x7f4308b5dc50>
The key file given is /home/airflow/.ssh/id_rsa
[2019-02-20 12:38:54,909] {base_hook.py:83} INFO - Using connection to: id: solr-mentions-cleanups. Host: some_host, Port: None, Schema: None, Login: some_user, Password: None, extra: {}
extra connection info given:
Key file in extra options:  None
SSH config file being used is  /home/airflow/.ssh/config
[2019-02-20 12:38:54,912] {models.py:501} DEBUG - Loaded DAG <DAG: solr-mentions-cleanups>
[2019-02-20 12:38:54,961] {cli.py:520} INFO - Running <TaskInstance: solr-mentions-cleanups.run-solr-mentions-cleanups-0 2019-02-14T00:00:00+00:00 [success]> on host xxxx.net
[2019-02-20 12:38:55,054] {cli_action_loggers.py:81} DEBUG - Calling callbacks: []
[2019-02-20 12:38:55,054] {settings.py:201} DEBUG - Disposing DB connection pool (PID 16453)
[2019-02-20 12:38:56,310] {cli_action_loggers.py:81} DEBUG - Calling callbacks: []
[2019-02-20 12:38:56,313] {settings.py:201} DEBUG - Disposing DB connection pool (PID 16375)
Run Code Online (Sandbox Code Playgroud)

此外,它曾经一直有效,直到我不得不重新启动它运行的服务器上的所有内容,所以我非常有信心这与曾经有效的版本很接近。

有什么想法我做错了吗?

另外,仅供参考,我们将其连接到另一台服务器上运行的 postgres 数据库。我已经确认状态肯定会更新,这确认那里没有连接问题。