Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据

Xen*_*rey 8 python mysql json google-bigquery airflow

首先,我使用 MySQL 查询从生产数据库中获取所有数据,然后将该数据NEW LINE DELIMITED JSON存储在谷歌云存储中,我想要做的是:
1. 检查表是否存在
2. 如果表不存在,使用创建表自动检测模式
3. 存储数据

所有这些都将在气流中进行安排。真正让我困惑的是数字2,我如何在 Python 中做到这一点?或者气流可以自动执行此操作吗?

kax*_*xil 7

Airflow 可以自动执行此操作。create_disposition如果需要,该参数会创建表。该autodetect参数正是您所需要的。这是针对Airflow 1.10.2 的

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    autodetect=True, # This uses autodetect
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)


Yun*_*ang 3

在 BigQuery 命令行中,如果您的 json 文件位于 GCS 上,则使用架构自动检测加载 JSON 数据可通过一条命令为您完成 2 + 3 项操作。

\n\n

查看 AirFlow 文档,GoogleCloudStorageToBigQueryOperator似乎在做同样的事情。我检查了它的来源,它只是调用 BigQuery load api。我相信它会做你想做的。

\n\n

当不清楚每个参数的含义时,您可以使用参数名称搜索BigQuery Jobs api

\n\n

例如,要在任务列表中实现 1,您只需指定:

\n\n

write_disposition (string) \xe2\x80\x93 如果表已存在,则写入配置。

\n\n

但为了知道需要作为 write_disposition 传递什么字符串,您必须在 BigQuery 文档中进行搜索。\n在此输入图像描述

\n