哎呀……清除气流中失败的任务状态时出现 AttributeError

7yl*_*l4r 6 airflow apache-airflow

我正在尝试清除失败的任务,以便它再次运行。

我通常使用树视图中的 Web GUI 执行此操作

树视图显示失败的任务和清除弹出窗口

选择“清除”后,我被定向到一个错误页面:

错误页面

此页面上的回溯与我在尝试使用 CLI 清除此任务时收到的错误相同:

[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t 
coverage_check  gom_modis_aqua_coverage_check 
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
  File "/usr/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
    include_upstream=args.upstream,
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
    dag = copy.deepcopy(self)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
    y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'
Run Code Online (Sandbox Code Playgroud)

寻找可能导致此问题的原因,我应该如何解决此任务以及将来如何避免这种情况的想法。

我能够通过使用“浏览 > 任务实例”搜索删除任务记录来解决这个问题,但仍然想探索这个问题,因为我已经多次看到这个问题。

尽管我的 DAG 代码越来越复杂,但这里摘录了在 dag 中定义运算符的地方:

    trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
    coverage_check = BranchPythonOperator(
        task_id='coverage_check',
        python_callable=_coverage_check,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(hours=3),
        queue=QUEUE.PYCMR,
        op_kwargs={
            'roi':region,
            'success_branch_id': trigger_granule_dag_id
        }
    )
Run Code Online (Sandbox Code Playgroud)

完整的源代码可以在github/USF-IMARS/imars_dags 浏览。以下是最相关部分的链接:

Sai*_*tam 2

下面是我创建的一个 DAG 示例,用于模拟您所面临的错误。

import logging
import os
from datetime import datetime, timedelta

import boto3
from airflow import DAG
from airflow import configuration as conf
from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator


def athena_data_validation(**kwargs):
    pass


start_date = datetime.now()

args = {
    'owner': 'airflow',
    'start_date': start_date,
    'depends_on_past': False,
    'wait_for_downstream': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
}

dag_name = 'data_validation_dag'

schedule_interval = None  

dag = DAG(
    dag_id=dag_name,
    default_args=args,
    schedule_interval=schedule_interval)

athena_client = boto3.client('athena', region_name='us-west-2')

DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"

start_task = DummyOperator(task_id='Start_Task', dag=dag)

end_task = DummyOperator(task_id='End_Task', dag=dag)

data_validation_task = ShortCircuitOperator(
    task_id='Data_Validation',
    provide_context=True,
    python_callable=athena_data_validation,
    op_kwargs={
        'athena_client': athena_client,
        'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
        's3_output_path': 's3://XXX/YYY/'
    },
    dag=dag)
data_validation_task.set_upstream(start_task)
data_validation_task.set_downstream(end_task)
Run Code Online (Sandbox Code Playgroud)

一次成功运行后,我尝试清除Data_Validation任务并得到相同的错误(见下文)。 在此输入图像描述 在此输入图像描述

我删除了athena_client对象创建并将其放置在athena_data_validation函数中,然后它就起作用了。因此,当我们在 Airflow UI 中执行 a 时clear,它会尝试执行 adeepcopy并获取上次运行中的所有对象。我仍在试图理解为什么它无法获取该object类型的副本,但我得到了一个对我有用的解决方法。