如何编写与Amazon Redshift连接的DAG?

jac*_*ack 0 airflow

假设我想编写一个DAG来显示Redshift特定模式中的所有表.该SQL查询Show Tables;

如何为它创建DAG?我认为这应该是这样的:

dag = airflow.DAG(
    'process_dimensions',
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=60),
    default_args=args,
    max_active_runs=1)

process_product_dim = SQLOperator(
    task_id='process_product_dim',
    conn_id='??????',
    sql='Show Tables',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

有谁知道如何正确写它?

Ben*_*ory 7

因为您想要返回该查询的结果而不是仅执行它,您将需要使用PostgresHook,特别是get_records方法.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import PostgresHook

def process_product_dim_py(**kwargs):
    conn_id = kwargs.get('conn_id')
    pg_hook = PostgresHook(conn_id)
    sql = "Show Tables;"

    records = pg_hook.get_records(sql)

    return records

dag = DAG(
    'process_dimensions',
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=60),
    default_args=args,
    max_active_runs=1)

process_product_dim = PythonOperator(
    task_id='process_product_dim',
    op_kwargs = {'conn_id':'my_redshift_connection'}
    python_callable=process_product_dim_py,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)