我想将包含 google 存储中文件名的字符串列表传递给 XCom。稍后由 GoogleCloudStorageToBigQueryOperator 任务获取。该source_objects场模板,使神社模板可以使用。不幸的是,Jinja 只能返回一个字符串,因此我无法在 XCom 中传递列表。
如何在 GoogleCloudStorageToBigQueryOperator 中使用 XCom 列表?
参考类似问题,通过使用provide_context解决: 将字符串列表作为 Airflow 中依赖任务的参数传递
我找到的最接近的解决方案是创建一个包装类并发送发布 xcom 的任务的 ID,如下所示:
@apply_defaults
def __init__(self, source_objects_task_id,
....
def execute(self, context):
source_objects = context['ti']
.xcom_pull(task_ids=self.source_objects_task_id)
operator = GoogleCloudStorageToBigQueryOperator(
source_objects=source_objects,
dag=self.dag,
....
)
operator.execute(context)
Run Code Online (Sandbox Code Playgroud)
不确定如何获取 Google Cloud Storage 对象列表,但如果您正在使用它,GoogleCloudStorageListOperator
那么您可以像在 BigQuery Web UI 中一样将通配符传递给source_objects
参数GoogleCloudStorageToBigQueryOperator
:
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
schema_object='gs://test-bucket/schema.json',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
如果您想使用 获取其他任务的列表xcom
,您可以创建一个新的操作符或 Airflow 插件来GoogleCloudStorageToBigQueryOperator
添加新的 param source_objects_task_id
,删除source_objects
param 并替换以下代码(第 203 和 204 行:https : //github.com/ apache/incubator-airflow/blob/ac9033db0981ae1f770a8bdb5597055751ab15bd/airflow/contrib/operators/gcs_to_bq.py#L203-L204):
source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
for source_object in self.source_objects]
Run Code Online (Sandbox Code Playgroud)
和
source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
for source_object in context['ti'].xcom_pull(task_ids=self.source_objects_task_id)]
Run Code Online (Sandbox Code Playgroud)
并按如下方式使用它:
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects_task_id='task-id-of-previos-task',
destination_project_dataset_table='dest_table',
schema_object='gs://test-bucket/schema.json',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6076 次 |
最近记录: |