假设我想编写一个DAG来显示Redshift特定模式中的所有表.该SQL
查询Show Tables;
如何为它创建DAG?我认为这应该是这样的:
dag = airflow.DAG(
'process_dimensions',
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=60),
default_args=args,
max_active_runs=1)
process_product_dim = SQLOperator(
task_id='process_product_dim',
conn_id='??????',
sql='Show Tables',
dag=dag)
Run Code Online (Sandbox Code Playgroud)
有谁知道如何正确写它?
因为您想要返回该查询的结果而不是仅执行它,您将需要使用PostgresHook,特别是get_records
方法.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import PostgresHook
def process_product_dim_py(**kwargs):
conn_id = kwargs.get('conn_id')
pg_hook = PostgresHook(conn_id)
sql = "Show Tables;"
records = pg_hook.get_records(sql)
return records
dag = DAG(
'process_dimensions',
schedule_interval="@daily",
dagrun_timeout=timedelta(minutes=60),
default_args=args,
max_active_runs=1)
process_product_dim = PythonOperator(
task_id='process_product_dim',
op_kwargs = {'conn_id':'my_redshift_connection'}
python_callable=process_product_dim_py,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
2363 次 |
最近记录: |