我正在使用Airflow来安排批处理作业.我有一个每晚运行的DAG(A)和每月运行一次的另一个DAG(B).B取决于A已成功完成.但是B需要很长时间才能运行,因此我希望将其保存在单独的DAG中以便更好地进行SLA报告.
如何在同一天成功运行DAG A,使DAG B运行?
我有一个 python DAGParent Job和 DAG Child Job。中的任务Child Job应该在成功完成Parent Job每天运行的任务时触发。如何添加外部作业触发器?
我的代码
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS
yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': yesterday,
'email': FAILURE_EMAILS,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')
execute_notebook = PostgresOperator(
task_id='data_sql',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM athena_rs.shipments limit 5",
dag=dag
)
Run Code Online (Sandbox Code Playgroud) python directed-acyclic-graphs python-3.x airflow airflow-scheduler
刚刚开始使用Airflow,并想知道构建大型DAG的最佳实践是什么.对于我们的ETL,我们有许多属于逻辑分组的任务,但这些组依赖于彼此.以下哪项被认为是最佳做法?
也欢迎其他建议.
我正在尝试使用ExternalTaskSensor,并且它已经陷入了另一个已经成功完成的DAG任务.
这里,第一个DAG"a"完成其任务,之后应该触发通过ExternalTaskSensor的第二个DAG"b".相反,它陷入了寻找a.first_task的困境.
第一个DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
第二个DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
我在这里错过了什么?
有没有一种优雅的方法来为DAG成功事件定义回调?我真的不想设置一个任务,它将使用on_sucess_callback在所有其他任务的上游。
谢谢!
我仍在部署过程中Airflow,我已经感觉到需要将 operators合并在一起。最常见的用例是将运算符和相应的sensor. 例如,人们可能希望将EmrStepOperator和链接在一起EmrStepSensor。
我正在DAG以编程方式创建我的s ,其中最大的一个包含 150 多个(相同的)分支,每个分支对不同的数据位(表)执行相同的一系列操作。因此,将构成DAG 中单个逻辑步骤的任务放在一起会很有帮助。
以下是我项目中的 2 个相互竞争的例子,为我的论点提供动力。
1.从S3路径删除数据然后写入新数据
此步骤包括 2 个操作员
DeleteS3PathOperator: 扩展自BaseOperator& 使用S3HookHadoopDistcpOperator: 从 SSHOperator2. 有条件地MSCK REPAIR在Hive桌子上表演
这一步包含4个操作符
BranchPythonOperator: 检查 Hive 表是否分区MsckRepairOperator:从扩展HiveOperator和执行MSCK上(REPAIR进行分配)表Dummy(Branch)Operator:使向上交替的分支路径,以MsckRepairOperator(对于非分区表)Dummy(Join)Operator: …我已经看到了这和这对SO问题,并作出相应的更改。但是,我的依赖DAG仍然卡在戳状态。以下是我的主DAG:
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator
today = datetime.today()
default_args = {
'depends_on_past': False,
'retries': 0,
'start_date': datetime(today.year, today.month, today.day),
'schedule_interval': '@once'
}
dag = DAG('call-procedure-and-bash', default_args=default_args)
call_procedure = JdbcOperator(
task_id='call_procedure',
jdbc_conn_id='airflow_db2',
sql='CALL AIRFLOW.TEST_INSERT (20)',
dag=dag
)
call_procedure
Run Code Online (Sandbox Code Playgroud)
以下是我的依赖DAG:
from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor
today = datetime.today()
default_args = {
'depends_on_past': False,
'retries': …Run Code Online (Sandbox Code Playgroud) 我真的需要知道如何使用 Airflow 让多个 DAG 相互调用(例如DAG_1 >> DAG_2),类似于它如何使用上游/下游运算符在 DAG 中运行任务。我需要知道如何设置这使气流运行DAG_1,然后将运行DAG_2一次,DAG_1已成功完成。
最好我希望在 Airflow dag文件夹中拥有DAG_1和DAG_2单独的文件,然后要么在运行开始时将调用作为前一步,要么有一个单独的“主”DAG,它创建并运行每个任务所在的 DAG另一个 DAG 而不是任务操作符。DAG_2DAG_1
如果有人知道如何做到这一点,请告诉我,我真的需要一些帮助。如果有人知道如何做到这一点,我也将接受在同一个文件中制作和连接两个单独的 DAG,但最好将链接的 DAG 放在不同的文件中。
预先感谢您的帮助。