我面临一个问题,我的 dag 无法导入,但无法弄清楚原因:
from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task,dag
@dag(
dag_id = "database_monitor",
schedule_interval = '*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21,0,tz="UTC"),
catchup=False,)
def Pipeline():
check_db_alive = SqlSensor(
task_id="check_db_alive",
conn_id="evergreen",
sql="SELECT pg_is_in_recovery()",
success= lambda x: x == False,
poke_interval= 60,
#timeout = 60 * 2,
mode = "reschedule",
)
@task()
def alert_of_db_inrecovery():
import requests
# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
data = {"@key":"kkll",
"@version" : "alertapi-0.1",
"@type":"ALERT",
"object" : "Testobject",
"severity" : "MINOR",
"text" : str("Former primary instance is in recovery")
}
requests.post('https://httpevents.systems/api/sendAlert',verify=False,data=data)
check_db_alive >> alert_of_db_inrecovery
dag = Pipeline()
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
AttributeError:“_TaskDecorator”对象没有属性“update_relative”
您需要调用Python任务流运算符,即
改成check_db_alive >> alert_of_db_inrecovery
check_db_alive >> alert_of_db_inrecovery()
检查正确的代码
from airflow.sensors.sql import SqlSensor
import pendulum
from airflow.decorators import task, dag
@dag(
dag_id="database_monitor",
schedule_interval='*/10 * * * *',
start_date=pendulum.datetime(2023, 7, 16, 21, 0, tz="UTC"),
catchup=False,
)
def Pipeline():
check_db_alive = SqlSensor(
task_id="check_db_alive",
conn_id="evergreen",
sql="SELECT pg_is_in_recovery()",
success=lambda x: x == False,
poke_interval=60,
# timeout = 60 * 2,
mode="reschedule",
)
@task
def alert_of_db_inrecovery():
import requests
# result = f"Former primary instance is in recovery, task_instance_key_str: {kwargs['task_instance_key_str']}"
data = {"@key": "kkll",
"@version": "alertapi-0.1",
"@type": "ALERT",
"object": "Testobject",
"severity": "MINOR",
"text": str("Former primary instance is in recovery")
}
requests.post('https://httpevents.systems/api/sendAlert', verify=False, data=data)
check_db_alive >> alert_of_db_inrecovery()
dag = Pipeline()
Run Code Online (Sandbox Code Playgroud)
参考: https: //airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
归档时间: |
|
查看次数: |
2534 次 |
最近记录: |