相关疑难解决方法(0)

如何在Airflow中设置DAG之间的依赖关系?

我正在使用Airflow来安排批处理作业.我有一个每晚运行的DAG(A)和每月运行一次的另一个DAG(B).B取决于A已成功完成.但是B需要很长时间才能运行,因此我希望将其保存在单独的DAG中以便更好地进行SLA报告.

如何在同一天成功运行DAG A,使DAG B运行?

python etl airflow

35
推荐指数
2
解决办法
2万
查看次数

如何使用 Python 在 Airflow 中的另一个 DAG 成功时触发 DAG?

我有一个 python DAGParent Job和 DAG Child Job。中的任务Child Job应该在成功完成Parent Job每天运行的任务时触发。如何添加外部作业触发器?

我的代码

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from utils import FAILURE_EMAILS

yesterday = datetime.combine(datetime.today() - timedelta(1), datetime.min.time())


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': yesterday,
    'email': FAILURE_EMAILS,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG('Child Job', default_args=default_args, schedule_interval='@daily')

execute_notebook = PostgresOperator(
  task_id='data_sql',
  postgres_conn_id='REDSHIFT_CONN',
  sql="SELECT * FROM athena_rs.shipments limit 5",
  dag=dag
)
Run Code Online (Sandbox Code Playgroud)

python directed-acyclic-graphs python-3.x airflow airflow-scheduler

14
推荐指数
2
解决办法
1万
查看次数

AIrflow - 跨多个文件拆分DAG定义

刚刚开始使用Airflow,并想知道构建大型DAG的最佳实践是什么.对于我们的ETL,我们有许多属于逻辑分组的任务,但这些组依赖于彼此.以下哪项被认为是最佳做法?

  • 一个大型DAG文件,其中包含该文件中的所有任务
  • 将DAG定义拆分为多个文件(如何执行此操作?)
  • 定义多个DAG,每个任务组一个,并使用ExternalTask​​Sensor设置它们之间的依赖关系

也欢迎其他建议.

airflow

12
推荐指数
2
解决办法
2120
查看次数

气流ExternalTask​​Sensor卡住了

我正在尝试使用ExternalTask​​Sensor,并且它已经陷入了另一个已经成功完成的DAG任务.

这里,第一个DAG"a"完成其任务,之后应该触发通过ExternalTask​​Sensor的第二个DAG"b".相反,它陷入了寻找a.first_task的困境.

第一个DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='a',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_first_task():
    print('First task is done')

PythonOperator(
    task_id='first_task',
    python_callable=do_first_task,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

第二个DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    dag_id='b',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \
PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

我在这里错过了什么?

python airflow

10
推荐指数
2
解决办法
7393
查看次数

气流DAG成功回调

有没有一种优雅的方法来为DAG成功事件定义回调?我真的不想设置一个任务,它将使用on_sucess_callback在所有其他任务的上游。

谢谢!

airflow apache-airflow

6
推荐指数
1
解决办法
2856
查看次数

将运算符融合在一起

我仍在部署过程中Airflow,我已经感觉到需要 operators合并在一起。最常见的用例是运算符和相应的sensor. 例如,人们可能希望将EmrStepOperator和链接在一起EmrStepSensor


我正在DAG编程方式创建我的s ,其中最大的一个包含 150 多个(相同的)分支,每个分支对不同的数据位(表)执行相同的一系列操作。因此,将构成DAG 中单个逻辑步骤的任务放在一起会很有帮助。

以下是我项目中的 2 个相互竞争的例子,为我的论点提供动力。

1.从S3路径删除数据然后写入新数据

此步骤包括 2 个操作员

  • DeleteS3PathOperator: 扩展自BaseOperator& 使用S3Hook
  • HadoopDistcpOperator: 从 SSHOperator

2. 有条件地MSCK REPAIRHive桌子上表演

这一步包含4个操作符

  • BranchPythonOperator: 检查 Hive 表是否分区
  • MsckRepairOperator:从扩展HiveOperator和执行MSCK上(REPAIR进行分配)表
  • Dummy(Branch)Operator:使向上交替的分支路径,以MsckRepairOperator(对于非分区表)
  • Dummy(Join)Operator: …

airflow

6
推荐指数
1
解决办法
1496
查看次数

气流:ExternalTask​​Sensor不会触发任务

我已经看到了对SO问题,并作出相应的更改。但是,我的依赖DAG仍然卡在戳状态。以下是我的主DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('call-procedure-and-bash', default_args=default_args)

call_procedure = JdbcOperator(
    task_id='call_procedure',
    jdbc_conn_id='airflow_db2',
    sql='CALL AIRFLOW.TEST_INSERT (20)',
    dag=dag
)

call_procedure
Run Code Online (Sandbox Code Playgroud)

以下是我的依赖DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': …
Run Code Online (Sandbox Code Playgroud)

python directed-acyclic-graphs airflow airflow-scheduler

5
推荐指数
1
解决办法
1781
查看次数

如何在 Airflow 中组合多个 DAG

我真的需要知道如何使用 Airflow 让多个 DAG 相互调用(例如DAG_1 >> DAG_2),类似于它如何使用上游/下游运算符在 DAG 中运行任务。我需要知道如何设置这使气流运行DAG_1,然后将运行DAG_2一次,DAG_1已成功完成。

最好我希望在 Airflow dag文件夹中拥有DAG_1DAG_2单独的文件,然后要么在运行开始时将调用作为前一步,要么有一个单独的“主”DAG,它创建并运行每个任务所在的 DAG另一个 DAG 而不是任务操作符。DAG_2DAG_1

如果有人知道如何做到这一点,请告诉我,我真的需要一些帮助。如果有人知道如何做到这一点,我也将接受在同一个文件中制作和连接两个单独的 DAG,但最好将链接的 DAG 放在不同的文件中。

预先感谢您的帮助。

python directed-acyclic-graphs python-3.x airflow

1
推荐指数
1
解决办法
3211
查看次数