Mas*_*syB 5 google-bigquery airflow
我有一个简单的DAG
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table20180524'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={})
start >> bq_query >> end
Run Code Online (Sandbox Code Playgroud)
执行bq_query
任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我仅更改destination_dataset_table
为my_dataset.my_table$20180524
。执行时出现以下错误bq_task
:
Partitioning specification must be provided in order to create partitioned table
Run Code Online (Sandbox Code Playgroud)
如何指定BigQuery将查询结果保存到每日分区表中?我的第一个猜测是使用query_params
in,BigQueryOperator
但是我没有找到任何有关如何使用该参数的示例。
编辑:
我正在使用google-cloud==0.27.0
python客户端...这是Prod中使用的一个客户端:(
您首先需要创建一个空的分区目标表。请按照此处的说明进行操作:链接以创建一个空的分区表
然后再次在气流管道下方运行。您可以尝试以下代码:
import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table={{ params.t_name }}),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={'t_name': table_name},
dag=dag
)
start >> bq_query >> end
Run Code Online (Sandbox Code Playgroud)
因此,我要做的是创建了一个动态表名变量并将其传递给BQ运算符。
归档时间: |
|
查看次数: |
3529 次 |
最近记录: |