Xen*_*rey 8 python mysql json google-bigquery airflow
首先,我使用 MySQL 查询从生产数据库中获取所有数据,然后将该数据NEW LINE DELIMITED JSON存储在谷歌云存储中,我想要做的是:
1. 检查表是否存在
2. 如果表不存在,使用创建表自动检测模式
3. 存储数据
所有这些都将在气流中进行安排。真正让我困惑的是数字2,我如何在 Python 中做到这一点?或者气流可以自动执行此操作吗?
Airflow 可以自动执行此操作。create_disposition如果需要,该参数会创建表。该autodetect参数正是您所需要的。这是针对Airflow 1.10.2 的。
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
autodetect=True, # This uses autodetect
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
在 BigQuery 命令行中,如果您的 json 文件位于 GCS 上,则使用架构自动检测加载 JSON 数据可通过一条命令为您完成 2 + 3 项操作。
\n\n查看 AirFlow 文档,GoogleCloudStorageToBigQueryOperator似乎在做同样的事情。我检查了它的来源,它只是调用 BigQuery load api。我相信它会做你想做的。
\n\n当不清楚每个参数的含义时,您可以使用参数名称搜索BigQuery Jobs api。
\n\n例如,要在任务列表中实现 1,您只需指定:
\n\nwrite_disposition (string) \xe2\x80\x93 如果表已存在,则写入配置。
\n\n但为了知道需要作为 write_disposition 传递什么字符串,您必须在 BigQuery 文档中进行搜索。\n
| 归档时间: |
|
| 查看次数: |
4500 次 |
| 最近记录: |