AuthorizationException: 请求中包含的安全令牌已过期

chi*_*e21 5 amazon-web-services elasticsearch boto3 aws-fargate

我有一个需要查询 Elasticsearch 和 Dynamodb 的 AWS Fargate。与集群关联的角色有权访问这些服务。一段时间后(大约 40 分钟),我开始收到此错误AuthorizationException: TransportError(403, '{"message":"The security token included in the request is expired"}')。这是我尝试访问 Elasticsearch 时引发的错误,如果我尝试访问 Dynamodb,则不会出现该错误。我使用的是 boto3 版本 1.9.160,我使用以下代码行获取凭据:

session = boto3.Session()
dynamodb_client_nvirginia = session.client(service_name='dynamodb')
aws_auth = AWS4Auth(
  session.get_credentials().access_key, session.get_credentials().secret_key, 'us-east-1', 'es', session_token=session.get_credentials().token)
elasticsearch_client = Elasticsearch(
  hosts=[{'host': 'my-elasticsearch-host', 'port': 443}],
  http_auth=aws_auth, use_ssl=True, verify_certs=True,
  connection_class=RequestsHttpConnection, timeout=30, max_retries=10, retry_on_timeout=True)
Run Code Online (Sandbox Code Playgroud)

我读到 boto3 会自动刷新凭据。

作为尝试,我决定在 30 分钟后刷新凭据以连接到 Elasticsearch,但我仍然遇到相同的错误。

我究竟做错了什么?

chi*_*e21 0

我决定使用单例来管理此错误并刷新客户端凭据。我创建了这个类:

class ElasticsearchClientInstanceGenerator:

    __instance = None

    def __init__(self):
        """ Virtually private constructor. """
        if self.__instance is not None:
            raise Exception("This class is a singleton!")
        else:
            ElasticsearchClientInstanceGenerator.__instance = self
        self.__elasticsearch_client = None
        self.__host: Optional[str] = None
        self.__port: Optional[int] = None
        self.__use_ssl: Optional[bool] = None
        self.__verify_certs: Optional[bool] = None

    def generate_elasticsearch_client(self):
        import boto3
        from requests_aws4auth import AWS4Auth
        from elasticsearch import Elasticsearch
        from elasticsearch import RequestsHttpConnection
        session = boto3.Session()
        credentials = session.get_credentials()
        aws_auth = AWS4Auth(
            credentials.access_key, credentials.secret_key, 'us-east-1', 'es', session_token=credentials.token)
        self.__elasticsearch_client = Elasticsearch(
            hosts=[{'host': self.__host, 'port': self.__port}],
            http_auth=aws_auth, use_ssl=self.__use_ssl, verify_certs=self.__verify_certs,
            connection_class=RequestsHttpConnection, timeout=30, max_retries=10, retry_on_timeout=True)

    def setup(self, host: str, port: int, use_ssl: bool, verify_certs: bool):
        self.__host: str = host
        self.__port: int = port
        self.__use_ssl: bool = use_ssl
        self.__verify_certs: bool = verify_certs

    @staticmethod
    def get_instance():
        if ElasticsearchClientInstanceGenerator.__instance is None:
            ElasticsearchClientInstanceGenerator()
        return ElasticsearchClientInstanceGenerator.__instance
Run Code Online (Sandbox Code Playgroud)

然后我创建了这个装饰器

def try_until_succeed(func):

    def catch_authorization_exception(*args, **kwargs):

        for i in range(0,10):
            try:
                data = func(*args, **kwargs)
                return data
            except AuthorizationException as ae:
               ElasticsearchClientInstanceGenerator.get_instance().generate_elasticsearch_client()
        raise Execption('Elasticsearch general exception')
    return catch_authorization_exception
Run Code Online (Sandbox Code Playgroud)

我在需要与 Elasticcearch 连接的每个方法上使用装饰器。每当我收到AuthorizationException错误时,客户端凭据就会刷新,之后我就可以连接到 Elasticsearch。