如何使用 Airflow 运算符将数据从 Big query 发送到 Amazon s3?
我需要使用哪些运算符?我被困在这个过程的中间。
到目前为止,这是我的代码
bq_check_date = BigQueryCheckOperator(
task_id='bq_check_date',
sql='''
SELECT
*
FROM
`myproject.test.test_table`
''',
use_legacy_sql=False,
bigquery_conn_id=GCP_CONN_ID,
dag=dag
)
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=BQ_DATASET,
table_id=BQ_TABLE,
location=LOCATION,
dag = dag,
bigquery_conn_id=GCP_CONN_ID,
)
Run Code Online (Sandbox Code Playgroud)
接下来是什么 ?任何想法,非常感谢!
您绝对可以探索由Astronomer维护的 Apache 2.0 许可的Astro SDK,它允许使用由 Apache Airflow 提供支持的 Python 和 SQL 快速、干净地开发 {Extract、Load、Transform} 工作流程。
对于您的用例,您可以尝试使用export_to_file运算符。
该软件包支持各种云提供商。对于 Google Bigquery 和 Amazon S3,您可以安装它pip install "astro-sdk-python[amazon,google]"或将此依赖项添加到您requirements.txt
的或者,您可以安装整个包
pip install "astro-sdk-python[all]"
安装后,您的 DAG 片段将至少如下所示:
from astro.constants import FileType
from astro.files import File
from astro import sql as aql
from astro.table import Table, Metadata
from airflow.models import DAG
from datetime import datetime, timedelta
dag = DAG(
dag_id="example_load_bigquery_to_s3",
start_date=datetime(2019, 1, 1),
schedule_interval=timedelta(minutes=30),
)
with dag:
aql.export_to_file(
task_id="get_data",
input_data=Table(
conn_id="bigquery_conn_id", // This is your Google Bigquery connection name configured in Airflow
name="test.test_table",
metadata=Metadata(schema="myproject"),
),
output_file=File(
path="s3://your_s3_bucket/target_file_name.csv",
conn_id="aws_default", // This is you AWS S3 connection name configured in Airflow
filetype=FileType.CSV,
),
if_exists="replace",
)
Run Code Online (Sandbox Code Playgroud)
免责声明:我在 Astronomer 工作,该公司将Astro SDK作为开源项目进行开发。