相关疑难解决方法(0)

气流ExternalTask​​Sensor卡住了

我正在尝试使用ExternalTask​​Sensor,并且它已经陷入了另一个已经成功完成的DAG任务.

这里,第一个DAG"a"完成其任务,之后应该触发通过ExternalTask​​Sensor的第二个DAG"b".相反,它陷入了寻找a.first_task的困境.

第一个DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='a',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_first_task():
    print('First task is done')

PythonOperator(
    task_id='first_task',
    python_callable=do_first_task,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

第二个DAG:

import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

dag = DAG(
    dag_id='b',
    default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='a',
    external_task_id='first_task',
    dag=dag) >> \
PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

我在这里错过了什么?

python airflow

10
推荐指数
2
解决办法
7393
查看次数

气流:ExternalTask​​Sensor不会触发任务

我已经看到了对SO问题,并作出相应的更改。但是,我的依赖DAG仍然卡在戳状态。以下是我的主DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('call-procedure-and-bash', default_args=default_args)

call_procedure = JdbcOperator(
    task_id='call_procedure',
    jdbc_conn_id='airflow_db2',
    sql='CALL AIRFLOW.TEST_INSERT (20)',
    dag=dag
)

call_procedure
Run Code Online (Sandbox Code Playgroud)

以下是我的依赖DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': …
Run Code Online (Sandbox Code Playgroud)

python directed-acyclic-graphs airflow airflow-scheduler

5
推荐指数
1
解决办法
1781
查看次数