Airflow 中是否有运算符可以根据 BigQuery 中的查询创建表?

Dav*_*sip 2 python google-bigquery airflow

我正在寻找类似的东西

CreateBQTableOperator(
    query='select * from my_table',
    output_table='my_other_table'
)
Run Code Online (Sandbox Code Playgroud)

我正在寻找现有的运算符或此类运算符的代码。操作员应该使用另一个参数来决定是否在重新创建表或将查询附加到当前表之前删除该表(如果该表存在)。

Ela*_*lad 5

对于 Airflow >= 1.10 以及提供程序,您可以使用BigQueryInsertJobOperator此操作程序正在使用JobConfigurationQuery,您可以使用参数配置 API 支持的任何选项configuration

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

execute_query_save = BigQueryInsertJobOperator(
    task_id="execute_query_save",
    configuration={
        "query": {
            "query": "select * from my_table",
            "useLegacySql": False,
            "writeDisposition": "WRITE_EMPTY",
            'destinationTable': {
                'projectId': "my-project",
                'datasetId': "my_data_set",
                'tableId': "table2"
            },
        }
    },
)
Run Code Online (Sandbox Code Playgroud)

对于较旧的 Airflow 版本,您可以使用BigQueryExecuteQueryOperator

运营商有destination_dataset_table

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

execute_query_save = BigQueryExecuteQueryOperator(
    task_id="execute_query_save",
    sql="SELECT * FROM my_data_set.table1",
    use_legacy_sql=False,
    destination_dataset_table="my_data_set.table2",
    location="southamerica-east1",
    write_disposition="WRITE_EMPTY",
    create_disposition="CREATE_IF_NEEDED",
)
Run Code Online (Sandbox Code Playgroud)

您可以通过设置参数值来控制请求的行为(参考Google 文档中的值)。

write_disposition选项有:

WRITE_TRUNCATE:如果表已存在,BigQuery 将覆盖表数据并使用查询结果中的架构。

WRITE_APPEND:如果表已存在,BigQuery 会将数据追加到表中。

WRITE_EMPTY:如果表已存在并包含数据,则作业结果中将返回“重复”错误。

create_disposition选项有:

CREATE_IF_NEEDED:如果该表不存在,BigQuery 将创建该表。

CREATE_NEVER:表必须已经存在。如果没有,作业结果中将返回“notFound”错误。