相关疑难解决方法(0)

Airflow xcom pull 只返回字符串

我有一个气流管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中。我使用 CloudSqlInstanceImportOperator 导入 CSV 文件。该运算符需要一个主体,其中包含文件名和其他参数。由于我在运行时读取了该文件名,因此我还必须在运行时定义主体。这一切都有效。但是当我从 xcom 拉出正文时,它返回一个字符串而不是 python 字典。所以 CloudSqlInstanceImportOperator 给了我以下错误(我的猜测是,因为主体是字符串而不是字典):

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 984, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 715, in execut
    self._validate_body_fields(
  File "/usr/local/lib/airflow/airflow/contrib/operators/gcp_sql_operator.py", line 712, in _validate_body_field
    api_version=self.api_version).validate(self.body
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 420, in validat
    dictionary_to_validate=body_to_validate
  File "/usr/local/lib/airflow/airflow/contrib/utils/gcp_field_validator.py", line 341, in _validate_fiel
    value = dictionary_to_validate.get(field_name
AttributeError: 'str' object has no attribute 'get
Run Code Online (Sandbox Code Playgroud)

这是我使用的代码:

import json 
import os
from datetime import datetime, timedelta
import ast
from airflow …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-sql google-cloud-platform airflow

3
推荐指数
1
解决办法
2826
查看次数