我在气流中有一个SubDAG,具有长时间运行的步骤(通常约2小时,但它根据运行的单位而变化).在1.7.1.3下,当步骤内的所有步骤都成功时,此步骤将始终导致AIRFLOW-736并且SubDAG将在"运行"状态中停止.我们可以通过手动将SubDagOperator标记为数据库中的成功(而不是运行)来解决SubDAG之后的步骤.
我们现在正在测试Airflow 1.8.1,通过执行以下操作进行升级:
在系统原本不受影响的情况下,相同的DAG现在在长时间运行的任务达到1小时标记之后大致失败100%的时间(尽管奇怪的是,不是3600秒之后 - 它可能是在30到90秒之后的任何地方.小时刻度),消息"执行程序报告任务实例已完成(失败),尽管任务说它正在运行.任务是否被外部杀死?".但是,任务本身继续在工作者身上继续运行.不知何故,尽管实际任务运行良好,但是在调度任务失败(参见jobs.py的这一行)时,调度程序之间存在分歧.
我已经确认,不知何故,气流数据库的task_instance表中的状态是"失败".因此,我想知道在任务本身仍在运行时可能将任务状态设置为失败的原因.
这是一个触发问题的示例dag:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
DEFAULT_ARGS = {'owner': 'jdoe', 'start_date': datetime(2017, 05, 30)}
def define_sub(dag, step_name, sleeptime):
op = BashOperator(
task_id=step_name, bash_command='sleep %i' % sleeptime,queue="model", dag=dag
)
return dag
def gen_sub_dag(parent_name, step_name, sleeptime):
sub = DAG(dag_id='%s.%s' % (parent_name, step_name), default_args=DEFAULT_ARGS)
define_sub(sub, step_name, sleeptime)
return sub
long_runner_parent = DAG(dag_id='long_runner', default_args=DEFAULT_ARGS, schedule_interval=None)
long_sub_dag = SubDagOperator(
subdag=gen_sub_dag('long_runner', 'long_runner_sub', 7500), task_id='long_runner_sub', dag=long_runner_parent
)
Run Code Online (Sandbox Code Playgroud)