Airflow Pipeline CSV 到 BigQuery 并进行架构更改

JW2*_*JW2 3 google-bigquery airflow

背景

我需要设计一个 Airflow 管道来将 CSV 加载到 BigQuery 中。

我知道 CSV 的架构经常发生变化。加载第一个文件后,架构可能是

id | ps_1 | ps_1_value

当第二个文件到达并加载它时,它可能看起来像

id | ps_1 | ps_1_value | ps_1 | ps_2_value

问题

处理这个问题的最佳方法是什么?


我对此的第一个想法是

  1. 加载第二个文件
  2. 将架构与当前表进行比较
  3. 更新表,添加两列(ps_2、ps_2_value)
  4. 插入新行

我会在 PythonOperator 中执行此操作。

如果文件 3 进来并且看起来id | ps_2 | ps_2_value我会填写缺失的列并进行插入。

感谢您的反馈。

JW2*_*JW2 6

加载两个先前的文件后example_data_1.csvexample_data_2.csv我可以看到字段被插入到正确的列中,并根据需要添加新列。

编辑:灵光乍现的时刻是意识到schema_update_options存在。请参阅此处: https: //googleapis.dev/python/bigquery/latest/ generated/google.cloud.bigquery.job.SchemaUpdateOption.html

csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
    task_id='csv_to_bigquery',
    google_cloud_storage_conn_id='google_cloud_default',
    bucket=airflow_bucket,
    source_objects=['data/example_data_3.csv'],
    skip_leading_rows=1,
    bigquery_conn_id='google_cloud_default',    
    destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
    autodetect=True,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述