GoogleCloudStorageToBigQueryOperator source_objects 通过 XCom 接收列表

jud*_*ole 1 python airflow

我想将包含 google 存储中文件名的字符串列表传递给 XCom。稍后由 GoogleCloudStorageToBigQueryOperator 任务获取。该source_objects场模板,使神社模板可以使用。不幸的是,Jinja 只能返回一个字符串,因此我无法在 XCom 中传递列表。

如何在 GoogleCloudStorageToBigQueryOperator 中使用 XCom 列表?

参考类似问题,通过使用provide_context解决: 将字符串列表作为 Airflow 中依赖任务的参数传递

我找到的最接近的解决方案是创建一个包装类并发送发布 xcom 的任务的 ID,如下所示:

@apply_defaults
def __init__(self, source_objects_task_id,
....
def execute(self, context):
    source_objects = context['ti']
          .xcom_pull(task_ids=self.source_objects_task_id)
    operator = GoogleCloudStorageToBigQueryOperator(
          source_objects=source_objects,
          dag=self.dag,
....
)

    operator.execute(context)
Run Code Online (Sandbox Code Playgroud)

kax*_*xil 5

不确定如何获取 Google Cloud Storage 对象列表,但如果您正在使用它,GoogleCloudStorageListOperator那么您可以像在 BigQuery Web UI 中一样将通配符传递给source_objects参数GoogleCloudStorageToBigQueryOperator

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

如果您想使用 获取其他任务的列表xcom,您可以创建一个新的操作符或 Airflow 插件来GoogleCloudStorageToBigQueryOperator添加新的 param source_objects_task_id,删除source_objectsparam 并替换以下代码(第 203 和 204 行:https : //github.com/ apache/incubator-airflow/blob/ac9033db0981ae1f770a8bdb5597055751ab15bd/airflow/contrib/operators/gcs_to_bq.py#L203-L204):

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in self.source_objects]
Run Code Online (Sandbox Code Playgroud)

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in context['ti'].xcom_pull(task_ids=self.source_objects_task_id)]
Run Code Online (Sandbox Code Playgroud)

并按如下方式使用它:

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects_task_id='task-id-of-previos-task',
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

  • 惊人的。很高兴它帮助了你:) (2认同)