气流 - SubDag中长时间运行的任务在一小时后标记为失败

J. *_*Doe 7 python airflow

我在气流中有一个SubDAG,具有长时间运行的步骤(通常约2小时,但它根据运行的单位而变化).在1.7.1.3下,当步骤内的所有步骤都成功时,此步骤将始终导致AIRFLOW-736并且SubDAG将在"运行"状态中停止.我们可以通过手动将SubDagOperator标记为数据库中的成功(而不是运行)来解决SubDAG之后的步骤.

我们现在正在测试Airflow 1.8.1,通过执行以下操作进行升级:

  1. 打倒我们的调度员和工人
  2. 通过pip,卸载气流并安装apache-airflow(版本1.8.1)
  3. 运行气流升级了
  4. 运行气流调度程序和工作人员

在系统原本不受影响的情况下,相同的DAG现在在长时间运行的任务达到1小时标记之后大致失败100%的时间(尽管奇怪的是,不是3600秒之后 - 它可能是在30到90秒之后的任何地方.小时刻度),消息"执行程序报告任务实例已完成(失败),尽管任务说它正在运行.任务是否被外部杀死?".但是,任务本身继续在工作者身上继续运行.不知何故,尽管实际任务运行良好,但是在调度任务失败(参见jobs.py的这一行)时,调度程序之间存在分歧.

我已经确认,不知何故,气流数据库的task_instance表中的状态是"失败".因此,我想知道在任务本身仍在运行时可能将任务状态设置为失败的原因.

这是一个触发问题的示例dag:

from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator

DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}

def define_sub(dag, step_name, sleeptime):
    op = BashOperator(
        task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
    )
    return dag

def gen_sub_dag(parent_name, step_name, sleeptime):
    sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
    define_sub(sub, step_name, sleeptime)
    return sub

long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)

long_sub_dag = SubDagOperator(
    subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
Run Code Online (Sandbox Code Playgroud)

Bol*_*uin 1

如果您确实正在使用 Celery 和 Redis 运行,请查看Celery 的可见性超时设置,并将其增加到任务的预期结束时间之外。

尽管我们将 Celery 配置为tasks-ack-late,但它仍然存在任务消失的问题。我们认为这是Celery 中的一个错误