Apache Airflow 任务卡在“up_for_retry”状态

shw*_*ill 5 airflow

我一直在我们的系统上设置一个气流集群,之前它一直在工作。我不确定我可能做了什么来改变这一点。

我有一个想要按计划运行的 DAG。为了确保它正常工作,我还想手动触发它。目前这些似乎都不起作用,并且似乎没有为任务实例写入日志。唯一可用的日志是气流调度程序日志,它们通常看起来很健康。

我只是不断地收到这条消息:

Task is not ready for retry yet but will be retried automatically. Current date is 2018-12-12T11:34:46.978355+00:00 and task will be retried at 2018-12-12T11:35:08.093313+00:00.

但是,如果我稍等一下,就会再次显示完全相同的消息,只是时间向前移动了一点。因此,该任务似乎从未真正被重试过。

我正在使用 LocalExecutor,任务是 SSHOperator。简化代码如下。它所做的只是将 ssh 连接到另一台机器上,并使用预先确定的目录结构启动一堆应用程序。:

DB_INFO_FILE = 'info.json'
START_SCRIPT = '/bin/start.sh'
TIME_IN_PAST = timezone.convert_to_utc(datetime.today() - 
timedelta(days=1))

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': TIME_IN_PAST,
    'email': ['some_email@blah.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

def _extract_instance_id(instance_string):
    return re.findall(r'\d+', instance_string)[0]

def _read_file_as_json(file_name):
    with open(file_name) as open_file:
         return json.load(open_file)

DB_INFO = _read_file_as_json(DB_INFO_FILE)
CONFIG_CLIENT = ConfigDbClient(**DB_INFO)

APP_DIRS = CONFIG_CLIENT.get_values('%my-app-info%')

INSTANCE_START_SCRIPT_PATHS = {
    _extract_instance_id(instance_string): directory+START_SCRIPT
    for instance_string, directory in APP_DIRS.items()
    }

# Create an ssh hook which refers to pre-existing connection information
# setup and stored by airflow
SSH_HOOK = SSHHook(ssh_conn_id='my-conn-id')

# Create a DAG object to add tasks to
DAG = DAG('my-dag-id',
          default_args=DEFAULT_ARGS)

# Create a task for each app instance.
for instance_id, start_script in INSTANCE_START_SCRIPT_PATHS.items():
    task = SSHOperator(
        task_id='run-script-{0}'.format(instance_id),
        command='bash {0}'.format(start_script),
        ssh_hook=SSH_HOOK,
        dag=DAG)
Run Code Online (Sandbox Code Playgroud)

当我通过命令行而不是通过 UI 单独运行任务时,它可以工作。似乎我可以运行任务,但我根本无法触发 DAG 运行。我已经尝试了许多 start_date 和间隔时间表的组合,只是为了进行完整性检查。

有任何想法吗?

是的,我知道之前有人问过这个问题,我已经看过所有这些问题,但没有一个解决方案对我有帮助。

dla*_*lin 6

哦。您start_date的变化速度与计划间隔期相同或更快。

这是调度程序每隔几秒钟看到的内容:

start_date: 2018-12-11T12:12:12.000Z  # E.G. IFF now is 2018-12-12T12:12:12.000Z, a day ago
schedule_interval: timedelta(days=1)  # The default
Run Code Online (Sandbox Code Playgroud)

以下是调度程序运行 DAG 所需的内容:上次运行发生的时间超过一个调度间隔之前。如果没有预定的发生来看,第一,如果一个完整的时间表间隔已经过去了,因为运行现在可以开始start_date,因为这是为最早允许的日期execution_date。在这种情况下,dag_runexecution_date设置为间隔期间的开始应该被创建。然后task_instance可以为 DAG 中满足依赖关系的任何任务创建 s,只要 s 在 DAGtask_instance execution_date之后start_date(这不会存储在dag_run对象上,而是通过在检查 dag 状态时加载 DAG 文件来重新计算)。

所以它不会自动安排,因为开始日期在满足间隔时不断变化。但是,如果它是 -2d,则至少会安排一次运行,然后任何进一步的运行都必须等到 1d 之后才能安排。不过如果你只是datetime在你的start_date.

但是对于手动运行的那些奇怪的重试呢?

您确实开始了一两次手动运行。execution_date除非您指定其他内容,否则这些运行将使用当前时间。这应该在 之后start_date,至少到明天,这应该清除它们以运行。但是,在您的日志中,您似乎看到它们失败并被标记为重试,并且不会减少您的重试。我不确定为什么会这样,但可能是SSHOperator.

您是否安装了[ssh]额外的气流,以便在网络服务器和调度程序上满足 SSHOperator 的依赖关系(特别是paramikosshtunnel)?其中之一正在工作,因为我认为它正在解析并根据添加到数据库而显示在 UI 中。

如果你执行,你会得到什么:

airflow test my-dag-id run-script-an_instance_id 2018-12-12T12:12:12
Run Code Online (Sandbox Code Playgroud)

您知道调度程序和网络服务器正在循环重新填充他们的 DAG 包,因此每天重新运行此 DAG 文件 1000 次,重新加载该 json(它是本地访问,因此类似于导入模块),并SSHHook使用数据库查找重新创建它. 我没有看到设置这个钩子有什么花哨的事情,为什么不ssh_hook从 中删除SSHOperator并替换它,ssh_conn_id='my-conn-id'以便它可以在执行时创建一次?我怀疑这就是导致重试只是向前推进的问题。