Apache Airflow - 连接到 AWS S3 错误

qal*_*lis 5 python amazon-s3 amazon-web-services airflow

我正在尝试使用 Connection 对象在 Apache Airflow 中获取 S3 挂钩。

它看起来像这样:

class S3ConnectionHandler:
    def __init__():
        # values are read from configuration class, which loads from env. variables
        self._s3 = Connection(
            conn_type="s3",
            conn_id=config.AWS_CONN_ID,
            login=config.AWS_ACCESS_KEY_ID,
            password=config.AWS_SECRET_ACCESS_KEY,
            extra=json.dumps({"region_name": config.AWS_DEFAULT_REGION}),
        )

    @property
    def s3(self) -> Connection:
        return get_live_connection(self.logger, self._s3)

    @property
    def s3_hook(self) -> S3Hook:
        return self.s3.get_hook()
Run Code Online (Sandbox Code Playgroud)

我收到错误:

Broken DAG: [...] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/connection.py", line 282, in get_hook
    return hook_class(**{conn_id_param: self.conn_id})
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 354, in __init__
    raise AirflowException('Either client_type or resource_type must be provided.')
airflow.exceptions.AirflowException: Either client_type or resource_type must be provided.
Run Code Online (Sandbox Code Playgroud)

为什么会出现这种情况?据我了解,S3Hook 调用父类 AwsHook 的构造函数,并将 client_type 作为“s3”字符串传递。我怎样才能解决这个问题?

我从这里获取了钩子的配置。

编辑:直接创建 S3 挂钩时我什至遇到相同的错误:

    @property
    def s3_hook(self) -> S3Hook:
        #return self.s3.get_hook()
        return S3Hook(
            aws_conn_id=config.AWS_CONN_ID,
            region_name=self.config.AWS_DEFAULT_REGION,
            client_type="s3",
            config={"aws_access_key_id": self.config.AWS_ACCESS_KEY_ID, "aws_secret_access_key": self.config.AWS_SECRET_ACCESS_KEY}
        )
``
Run Code Online (Sandbox Code Playgroud)

qal*_*lis 1

没有其他答案有效,我无法解决这个问题。我最终boto3直接使用库,这也给了我 Airflow hooks 所缺乏的更多底层灵活性。