最近更新了我的 Airflow,发现BigQueryExecuteQueryOperator
已被弃用,取而代之的是BigQueryInsertJobOperator
. 该文档似乎相当模糊,链接到REST 资源:jobs(和方法:jobs.query)。特别是,我不清楚是否有任何地方可以指定write_disposition
、destination_dataset_table
等。我想确保我不会让事情变得过于复杂。
我目前在哪里做
# my.sql
SELECT * FROM `proj.ds.table_1`
---------------------------------
# my-dag.py
BigQueryExecuteQueryOperator(
task_id='copy-table-1',
sql = 'my.sql',
destination_dataset_table='proj:ds.table_2',
write_disposition='WRITE_EMPTY',
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
我现在需要使用DDL 语句吗
# my.sql
CREATE TABLE IF NOT EXISTS
ds.table_2
AS (
SELECT * FROM `proj.ds.table_1`
)
---------------------------------
# my-dag.py
BigQueryInsertJobOperator(
task_id='copy-table-1',
configuration={
'query': my.sql
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
更不用说以QueryParameter 文档建议的格式将参数传递给查询,而不仅仅是params={ 'table': THE_TABLE }
......
我正在尝试使用 Dataflow、Pub/Sub 和 BigQuery 创建和导出合成数据流。我使用以下模式遵循合成数据生成说明:
{
"id": "{{uuid()}}",
"test_value": {{integer(1,50)}}
}
Run Code Online (Sandbox Code Playgroud)
该架构位于一个文件中gs://my-folder/my-schema.json
。该流似乎运行正常 - 我可以使用“导出到云存储”模板从相应的 Pub/Sub 主题导出到 GCS 存储桶。当我尝试使用“导出到 BigQuery”模板时,我不断收到此错误:
Request failed with code 400, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://bigquery.googleapis.com/bigquery/v2/projects/<my-project>/datasets/<my-dataset>/tables/<my-table>/insertAll.
Run Code Online (Sandbox Code Playgroud)
在开始导出作业之前,我创建了一个空表<my-project>:<my-dataset>.<my-table>
,其中的字段与上面的 JSON 架构匹配:
id STRING NULLABLE
test_value INTEGER NULLABLE
Run Code Online (Sandbox Code Playgroud)
我已经outputTableSpec
设置为<my-project>:<my-dataset>.<my-table>
.