如何运行简单的气流DAG

djo*_*hon 14 python airflow

我对Airflow完全不熟悉.我想在指定的日期运行一个简单的DAG.我很难在开始日期,执行日期和回填之间做出改变.运行DAG的命令是什么?

这是我以后尝试过的:

airflow run dag_1 task_1 2017-1-23
Run Code Online (Sandbox Code Playgroud)

我第一次运行该命令时,任务执行正确,但是当我再次尝试时它没有用.

这是我跑的另一个命令:

airflow backfill dag_1 -s 2017-1-23 -e 2017-1-24
Run Code Online (Sandbox Code Playgroud)

我不知道这个命令会发生什么.DAG每天会从23点到24点执行吗?

在运行上面的两个命令之前,我这样做了:

airflow initdb
airflow scheduler 
airflow webserver -p 8085 --debug &
Run Code Online (Sandbox Code Playgroud)

这是我的DAG

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 1, 23, 12),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'dag_1', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='create_clients',
    bash_command='Rscript /scripts/Cli.r',
    dag=dag)

t2 = BashOperator(
    task_id='create_operation',
    bash_command='Rscript Operation.r',
    retries=3,
    dag=dag)

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

屏幕截图:树视图

UPDATE

airflow run dag_1 task_1 2017-1-23T10:34
Run Code Online (Sandbox Code Playgroud)

Nec*_*ver 20

如果你用它运行一次

airflow run dag_1 task_1 2017-1-23
Run Code Online (Sandbox Code Playgroud)

保存运行并再次运行它将不会执行任何操作,您可以尝试通过强制它来重新运行它

airflow run --force=true dag_1 task_1 2017-1-23
Run Code Online (Sandbox Code Playgroud)

气流回填命令将运行在从开始日期到结束日期指定的时间段内运行的任何执行.它将取决于您在DAG上设置的计划,如果您将其设置为每小时触发它应运行24次,但它也不会重新执行先前执行的运行.

您可以清除任务,就像它从未运行一样

airflow clear dag_1 -s 2017-1-23 -e 2017-1-24
Run Code Online (Sandbox Code Playgroud)

另请查看cli文档:https://airflow.incubator.apache.org/cli.html

  • `-f` 选项即可,无需设置 `=true`。Airflow 1.9`气流运行:错误:参数-f/--force:忽略显式参数'true'` (2认同)

Pri*_*hta 9

开始日期、执行日期和回填之间的差异

回填是为了显式运行 DAG 以测试/手动运行 DAG/重新运行出错的 DAG。您使用 CLI 执行此操作

airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately
Run Code Online (Sandbox Code Playgroud)

顾名思义,start_date是 DAG 定义有效的日期

execution_date是要运行的日期时间。您在测试 DAG 的各个任务时提供的,如下所示

airflow test <<dag>> <<task>> <<exec_date>>
Run Code Online (Sandbox Code Playgroud)

运行 dag 的命令是什么

Backfill是显式运行 DAG 的命令。否则,您只需将 DAG 放在 DAGBAG 文件夹中,调度程序将根据 DAG 定义中定义的调度运行它

airflow backfill -s <<start_date>> <<dag>> 
#optionally provide -1 as start_date to run it immediately
Run Code Online (Sandbox Code Playgroud)