如果任何任务失败,气流回填将停止

dhr*_*shp 5 airflow airflow-scheduler

我正在使用气流 cli 的backfill命令手动运行一些回填作业。

 airflow backfill mydag -i -s 2018-01-11T16-00-00 -e 2018-01-31T23-00-00 --reset_dagruns --rerun_failed_tasks
Run Code Online (Sandbox Code Playgroud)

dag 间隔是每小时一次,大约有 40 个任务。因此,这种回填工作需要一天多的时间才能完成。我需要它在没有监督的情况下运行。但是,我注意到,即使一项任务在回填间隔中的一次运行中失败,整个回填作业也会因以下异常而停止,我必须再次手动重新启动它。

    Traceback (most recent call last):
      File "/home/ubuntu/airflow/bin/airflow", line 4, in <module>
        __import__('pkg_resources').run_script('apache-airflow==1.10.0', 'airflow')
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/pkg_resources/__init__.py"
    , line 719, in run_script
        self.require(requires)[0].run_script(script_name, ns)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/pkg_resources/__init__.py", line 1504, in run_script
        exec(code, namespace, namespace)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/EGG-INFO/scripts/airflow", line 32, in <module>
        args.func(args)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow/utils/cli.py", line 74, in wrapper
        return f(*args, **kwargs)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/airflow/bin/cli.py", line 217, in backfill
        rerun_failed_tasks=args.rerun_failed_tasks,
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow/models.py", line 4105, in run
        job.run()
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/airflow/jobs.py", line 202, in run
        self._execute()
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.5.egg/airflow/utils/db.py", line 74, in wrapper
        return func(*args, **kwargs)
      File "/home/ubuntu/airflow/lib/python3.5/site-packages/apache_airflow-1.10.0-py3.
    5.egg/airflow/jobs.py", line 2533, in _execute


 airflow.exceptions.AirflowException: 

Some task instances failed:
{('mydag', 'a_task', datetime.datetime(2018, 1, 30, 17, 5, tzinfo=psy
copg2.tz.FixedOffsetTimezone(offset=0, name=None)))}
Run Code Online (Sandbox Code Playgroud)

任务实例不依赖于它们之前的实例,因此我不介意一两个任务失败。我需要这份工作才能继续。

我在回填文档中找不到任何允许我指定此行为的选项。

有没有办法实现我正在寻找的东西?

Ale*_*ino -1

如果我正确理解您的问题,您寻求的行为可以通过设置来实现

'depends_on_past': False
Run Code Online (Sandbox Code Playgroud)

在 DAG 参数中。

来源: https: //airflow.incubator.apache.org/tutorial.html#backfill