气流-损坏的DAG-超时

dar*_*rse 5 airflow airflow-scheduler

我有一个DAG,它执行一个连接到Postgres DB的函数,删除表中的内容,然后插入一个新的数据集。

我正在本地尝试此操作,并且看到尝试运行此操作时,Web服务器需要很长时间才能连接,并且在大多数情况下不会成功。但是,作为连接过程的一部分,它似乎正在从后端执行查询。因为我具有删除功能,所以即使我没有计划脚本或手动启动,我也会看到数据已从表中删除(基本上是执行了其中一个功能)。有人可以建议我在这方面做错什么。

在用户界面中弹出的一个错误是

DAG损坏:[/ Users / user / airflow / dags / dwh_sample23.py]超时

还可以在UI的dag ID旁边看到一个i,表示Web服务器的DAG对象中没有DAG。 下面给出的是我正在使用的代码:

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
    ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
    dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = BashOperator(
task_id='DB_Connect',
python_callable=db_login(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)

t2 = BashOperator(
task_id='del',
python_callable=tbl1_del(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t3 = BashOperator(
task_id='populate',
python_callable=pop_tbl1(),
bash_command='python3 ~/airflow/dags/dwh_sample23.py',
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)
Run Code Online (Sandbox Code Playgroud)

有人可以协助吗?谢谢。

gru*_*uby 1

BashOperator您可以使用PythonOperator并调用db_login(), tbl1_del(),pop_tbl1()而不是使用PythonOperator

## Third party Library Imports
import pandas as pd
import psycopg2
import airflow
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from sqlalchemy import create_engine
import io


# Following are defaults which can be overridden later on
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2018, 5, 21),
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG('dwh_sample23', default_args=default_args)


#######################
## Login to DB

def db_login():
    ''' This function connects to the Data Warehouse and returns the cursor to execute queries '''
global dwh_connection
try:
    dwh_connection = psycopg2.connect(" dbname = 'dbname' user = 'user' password = 'password' host = 'hostname' port = '5439' sslmode = 'require' ")
except:
    print("I am unable to connect to the database.")
print('Success')
return(dwh_connection)

def tbl1_del():
''' This function takes clears all rows from tbl1 '''
cur = dwh_connection.cursor()
cur.execute("""DELETE FROM tbl1;""")
dwh_connection.commit()


def pop_tbl1():
''' This function populates all rows in tbl1 '''
cur = dwh_connection.cursor()
cur.execute(""" INSERT INTO tbl1
select id,name,price from tbl2;""")
dwh_connection.commit()



db_login()
tbl1_del()
pop_tbl1()
dwh_connection.close()

##########################################


t1 = PythonOperator(
task_id='DB_Connect',
python_callable=db_login(),
dag=dag)

t2 = PythonOperator(
task_id='del',
python_callable=tbl1_del(),
dag=dag)


t3 = PythonOperator(
task_id='populate',
python_callable=pop_tbl1(),
dag=dag)


t1.set_downstream(t2)
t2.set_downstream(t3)
Run Code Online (Sandbox Code Playgroud)

  • 谢谢。我现在面临以下错误 ERROR [root] Process timed out ERROR [airflow.models.DagBag] 无法导入:/Users/user/airflow/dags/dwh_sample23.py Traceback(最近一次调用最后):文件“/Users/ user/anaconda3/lib/python3.6/site-packages/airflow/models.py”,第 263 行,process_file 文件“/Users/user/anaconda3/lib/python3.6/site-packages/airflow/utils/timeout .py”,第38行,在handle_timeout中引发AirflowTaskTimeout(self.error_message)airflow.exceptions.AirflowTaskTimeout:超时完成。您能帮忙吗?谢谢.. (2认同)