相关疑难解决方法(0)

气流动态DAG和任务ID

我主要看到Airflow用于ETL/Bid数据相关的工作.我正在尝试将其用于业务工作流,其中用户操作将在未来触发一组依赖任务.其中一些任务可能需要根据某些其他用户操作进行清除(删除).我认为处理这个的最好方法是通过动态任务ID.我读到Airflow支持动态dag id.所以,我创建了一个简单的python脚本,它将DAG id和task id作为命令行参数.但是,我遇到了使它工作的问题.它给出了dag_id not found错误.有没人试过这个?这是脚本的代码(称之为tmp.py),我在命令行上执行python(python tmp.py 820 2016-08-24T22:50:00):

from __future__ import print_function
import os
import sys
import shutil
from datetime import date, datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
execution = '2016-08-24T22:20:00'
if len(sys.argv) > 2  :
   dagid =  sys.argv[1]
   taskid = 'Activate' + sys.argv[1]
   execution = sys.argv[2]
else:
   dagid = 'DAGObjectId'
   taskid = 'Activate'
default_args = {'owner' : 'airflow', 'depends_on_past': False, 'start_date':date.today(), 'email': ['fake@fake.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1}
dag = DAG(dag_id = dagid, …
Run Code Online (Sandbox Code Playgroud)

airflow

19
推荐指数
2
解决办法
2万
查看次数

标签 统计

airflow ×1