所以我有一个任务的测试 dag,这是一个简单的 ETL 尝试从 mssql 数据库中提取数据并将它们加载到 postgres 数据库。所以它的工作方式是按日期选择并插入过去 360 天的 postgres 数据库。但任务在 10 天左右后 select 语句超时。
def get_receiveCars(**kwargs):
#get current date
end_date = datetime.now()
#loop for 360 days
for x in range(360):
startDate = today - timedelta(days=x)
delete_dataPostgres(startDate.strftime('%Y-%m-%d'), "received sample")
select_dataMsql(startDate)
Run Code Online (Sandbox Code Playgroud)
选择语句是:
def select_dataMsql(startDate):
#insert data
endDate = str(startDate.strftime('%Y-%m-%d')) + " 23:59:59"
ms_hook = MsSqlHook(mssql_conn_id='mssql_db')
select_sql="""select carColor, carBrand, fuelType, COUNT(DISTINCT RequestID ) AS received
FROM Requests
where
ReceivedDateTime >= %s
AND ReceivedDateTime< %s
GROUP BY carColor, carBrand, fuelType"""
cond = …Run Code Online (Sandbox Code Playgroud)