小编bam*_*dan的帖子

函数内的气流 pythonOperator ds 变量

我知道之前有人问过这个问题,但没有一个答案回答它。我开始有点发疯了!我很困惑所以真的很感激一些帮助。

我有一个带有 python 运算符的 DAG,它运行 SQL 查询并输出到 .csv。第二个运算符仅返回 true 以生成 DAG。我似乎无法访问我的函数中的 ds 变量。我想这样做是为了传递给查询。

from airflow.models import Variable, DAG
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json


sql_path = Variable.get("sql_path")
date = Variable.get("ds")
first_date = Variable.get("ds")

print date

def get_redshift_data(ds,**kwargs):
    pg_hook = PostgresHook(postgres_conn_id='redshift')
    params = {'window_start_date':date,'window_end_date':first_date}
    with open(sql_path+"/native.sql") as f:
        sql_file = f.read() % (params)
    df2 = pg_hook.get_pandas_df(sql_file)
    df2.to_csv("test_1.csv", encoding = "utf-8")

def print_test(ds, **kwargs):
    return True

args = {
    'owner': 'Bob', …
Run Code Online (Sandbox Code Playgroud)

python jinja2 amazon-redshift airflow

0
推荐指数
1
解决办法
6509
查看次数

标签 统计

airflow ×1

amazon-redshift ×1

jinja2 ×1

python ×1