使用 Airflow 将数据从 Big query 传输到 Amazon S3

Abd*_*hel 0 python airflow

如何使用 Airflow 运算符将数据从 Big query 发送到 Amazon s3?

我需要使用哪些运算符?我被困在这个过程的中间。

到目前为止,这是我的代码

bq_check_date = BigQueryCheckOperator(
    task_id='bq_check_date',
    sql='''
    SELECT
    *
    FROM
    `myproject.test.test_table`
    ''',
    use_legacy_sql=False,
    bigquery_conn_id=GCP_CONN_ID,
    dag=dag
)
get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=BQ_DATASET,
    table_id=BQ_TABLE,
    location=LOCATION,
    dag = dag,
    bigquery_conn_id=GCP_CONN_ID,
)

Run Code Online (Sandbox Code Playgroud)

接下来是什么 ?任何想法,非常感谢!

Pan*_*oti 5

您绝对可以探索由Astronomer维护的 Apache 2.0 许可的Astro SDK,它允许使用由 Apache Airflow 提供支持的 Python 和 SQL 快速、干净地开发 {Extract、Load、Transform} 工作流程。

对于您的用例,您可以尝试使用export_to_file运算符。

该软件包支持各种云提供商。对于 Google Bigquery 和 Amazon S3,您可以安装它pip install "astro-sdk-python[amazon,google]"或将此依赖项添加到您requirements.txt 的或者,您可以安装整个包 pip install "astro-sdk-python[all]"

安装后,您的 DAG 片段将至少如下所示:

from astro.constants import FileType
from astro.files import File
from astro import sql as aql
from astro.table import Table, Metadata
from airflow.models import DAG

from datetime import datetime, timedelta

dag = DAG(
    dag_id="example_load_bigquery_to_s3",
    start_date=datetime(2019, 1, 1),
    schedule_interval=timedelta(minutes=30),
)
with dag:
    aql.export_to_file(
        task_id="get_data",
        input_data=Table(
            conn_id="bigquery_conn_id", // This is your Google Bigquery connection name configured in Airflow
            name="test.test_table",
            metadata=Metadata(schema="myproject"),
        ),
        output_file=File(
            path="s3://your_s3_bucket/target_file_name.csv",
            conn_id="aws_default", // This is you AWS S3 connection name configured in Airflow
            filetype=FileType.CSV,
        ),
        if_exists="replace",
    )
Run Code Online (Sandbox Code Playgroud)

免责声明:我在 Astronomer 工作,该公司将Astro SDK作为开源项目进行开发。