JW2*_*JW2 3 google-bigquery airflow
我需要设计一个 Airflow 管道来将 CSV 加载到 BigQuery 中。
我知道 CSV 的架构经常发生变化。加载第一个文件后,架构可能是
id | ps_1 | ps_1_value
当第二个文件到达并加载它时,它可能看起来像
id | ps_1 | ps_1_value | ps_1 | ps_2_value。
处理这个问题的最佳方法是什么?
我对此的第一个想法是
我会在 PythonOperator 中执行此操作。
如果文件 3 进来并且看起来id | ps_2 | ps_2_value我会填写缺失的列并进行插入。
感谢您的反馈。
加载两个先前的文件后example_data_1.csv,example_data_2.csv我可以看到字段被插入到正确的列中,并根据需要添加新列。
编辑:灵光乍现的时刻是意识到schema_update_options存在。请参阅此处: https: //googleapis.dev/python/bigquery/latest/ generated/google.cloud.bigquery.job.SchemaUpdateOption.html
csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id='csv_to_bigquery',
google_cloud_storage_conn_id='google_cloud_default',
bucket=airflow_bucket,
source_objects=['data/example_data_3.csv'],
skip_leading_rows=1,
bigquery_conn_id='google_cloud_default',
destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
autodetect=True,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
5303 次 |
| 最近记录: |