气流动态DAG和任务ID

Dea*_*Sha 19 airflow

我主要看到Airflow用于ETL/Bid数据相关的工作.我正在尝试将其用于业务工作流,其中用户操作将在未来触发一组依赖任务.其中一些任务可能需要根据某些其他用户操作进行清除(删除).我认为处理这个的最好方法是通过动态任务ID.我读到Airflow支持动态dag id.所以,我创建了一个简单的python脚本,它将DAG id和task id作为命令行参数.但是,我遇到了使它工作的问题.它给出了dag_id not found错误.有没人试过这个?这是脚本的代码(称之为tmp.py),我在命令行上执行python(python tmp.py 820 2016-08-24T22:50:00):

from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2  :
   dagid =  sys.argv[1]
   taskid = 'Activate' + sys.argv[1]
   execution = sys.argv[2]
else:
   dagid = 'DAGObjectId'
   taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['fake@fake.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid,
       default_args=default_args,
       schedule_interval='@once',
      )
 globals()[dagid] = dag
task1 = BashOperator(
    task_id = taskid,
    bash_command='ls -l',
    dag=dag)

fakeTask = BashOperator(
    task_id = 'fakeTask',
    bash_command='sleep 5',
    retries = 3,
    dag=dag)
task1.set_upstream(fakeTask)

airflowcmd = "airflow run " + dagid + " " + taskid + "  " + execution
print("airflowcmd = " + airflowcmd)
os.system(airflowcmd)
Run Code Online (Sandbox Code Playgroud)

Dea*_*Sha 19

经过多次试验和错误后,我才弄明白这一点.希望它会帮助某人.以下是它的工作原理:您需要有一个迭代器或一个外部源(文件/数据库表)来通过模板动态生成dags/task.您可以将dag和任务名称保持为静态,只需动态分配它们即可将一个dag与另一个dag区分开来.你把这个python脚本放在dags文件夹中.启动气流调度程序时,它会在每个心跳上运行此脚本,并将DAG写入数据库中的dag表.如果已经写了一个dag(唯一的dag id),它就会跳过它.调度程序还会查看各个DAG的计划,以确定哪个DAG可以执行.如果DAG已准备好执行,它将执行它并更新其状态.这是一个示例代码:

from airflow.operators import PythonOperator
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import sys
import time

dagid   = 'DA' + str(int(time.time()))
taskid  = 'TA' + str(int(time.time()))

input_file = '/home/directory/airflow/textfile_for_dagids_and_schedule'

def my_sleeping_function(random_base):
    '''This is a function that will run within the DAG execution'''
    time.sleep(random_base)

def_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(), 'email_on_failure': False,                
    'retries': 1, 'retry_delay': timedelta(minutes=2)
}
with open(input_file,'r') as f:
    for line in f:
        args = line.strip().split(',')
    if len(args) < 6:
        continue
    dagid = 'DAA' + args[0]
    taskid = 'TAA' + args[0]
    yyyy    = int(args[1])
    mm      = int(args[2])
    dd      = int(args[3])
    hh      = int(args[4])
    mins    = int(args[5])
    ss      = int(args[6])
    dag = DAG(
        dag_id=dagid, default_args=def_args,
        schedule_interval='@once', start_date=datetime(yyyy,mm,dd,hh,mins,ss)
        )

    myBashTask = BashOperator(
        task_id=taskid,
        bash_command='python /home/directory/airflow/sendemail.py',
        dag=dag)

    task2id = taskid + '-X'

    task_sleep = PythonOperator(
        task_id=task2id,
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': 10},
        dag=dag)

    task_sleep.set_upstream(myBashTask)

f.close()
Run Code Online (Sandbox Code Playgroud)

  • 你不需要这样做:globals()[dag_id] = dag以确保所有的dag都可用吗? (3认同)
  • 谢谢你分享你的代码!你有什么时间问题,因为调度程序会在每次心跳上运行所有这些步骤吗? (2认同)
  • @ dean-sha我会将输入文件创建为上游任务.这样,每个作业只运行一次,而不是每次心跳. (2认同)

小智 15

如何动态创建DAG?:

对于在其全局命名空间中包含DAG对象的模块,Airflow会在您[sic] DAGS_FOLDER中查找,并在DagBag中添加它找到的对象.知道这一切我们需要的是一种在全局命名空间中动态分配变量的方法,这可以在python中使用globals()函数轻松完成,标准库的行为就像一个简单的字典.

for i in range(10):
    dag_id = 'foo_{}'.format(i)
    globals()[dag_id] = DAG(dag_id)
    # or better, call a function that returns a DAG object!
Run Code Online (Sandbox Code Playgroud)

  • 问题是范围(10).如果它不是10但是只有在运行时才知道的未知值会怎么样? (13认同)
  • @Programmer120我有类似的情况,我需要在循环中创建一个运算符实例。然而,“no_of_count”是在运行时决定的。当我在同一 DAG 的运行时初始化该常量时,我​​的 DAG 任务陷入未知状态。为了解决这个问题,我在另一个 DAG 运行时计算该常量,并将它们保存在变量上,并在当前 DAG 上读取该变量的值。希望这可以帮助。 (3认同)