chi*_*e21 5 amazon-web-services elasticsearch boto3 aws-fargate
我有一个需要查询 Elasticsearch 和 Dynamodb 的 AWS Fargate。与集群关联的角色有权访问这些服务。一段时间后(大约 40 分钟),我开始收到此错误AuthorizationException: TransportError(403, '{"message":"The security token included in the request is expired"}')。这是我尝试访问 Elasticsearch 时引发的错误,如果我尝试访问 Dynamodb,则不会出现该错误。我使用的是 boto3 版本 1.9.160,我使用以下代码行获取凭据:
session = boto3.Session()
dynamodb_client_nvirginia = session.client(service_name='dynamodb')
aws_auth = AWS4Auth(
session.get_credentials().access_key, session.get_credentials().secret_key, 'us-east-1', 'es', session_token=session.get_credentials().token)
elasticsearch_client = Elasticsearch(
hosts=[{'host': 'my-elasticsearch-host', 'port': 443}],
http_auth=aws_auth, use_ssl=True, verify_certs=True,
connection_class=RequestsHttpConnection, timeout=30, max_retries=10, retry_on_timeout=True)
Run Code Online (Sandbox Code Playgroud)
我读到 boto3 会自动刷新凭据。
作为尝试,我决定在 30 分钟后刷新凭据以连接到 Elasticsearch,但我仍然遇到相同的错误。
我究竟做错了什么?
我决定使用单例来管理此错误并刷新客户端凭据。我创建了这个类:
class ElasticsearchClientInstanceGenerator:
__instance = None
def __init__(self):
""" Virtually private constructor. """
if self.__instance is not None:
raise Exception("This class is a singleton!")
else:
ElasticsearchClientInstanceGenerator.__instance = self
self.__elasticsearch_client = None
self.__host: Optional[str] = None
self.__port: Optional[int] = None
self.__use_ssl: Optional[bool] = None
self.__verify_certs: Optional[bool] = None
def generate_elasticsearch_client(self):
import boto3
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch
from elasticsearch import RequestsHttpConnection
session = boto3.Session()
credentials = session.get_credentials()
aws_auth = AWS4Auth(
credentials.access_key, credentials.secret_key, 'us-east-1', 'es', session_token=credentials.token)
self.__elasticsearch_client = Elasticsearch(
hosts=[{'host': self.__host, 'port': self.__port}],
http_auth=aws_auth, use_ssl=self.__use_ssl, verify_certs=self.__verify_certs,
connection_class=RequestsHttpConnection, timeout=30, max_retries=10, retry_on_timeout=True)
def setup(self, host: str, port: int, use_ssl: bool, verify_certs: bool):
self.__host: str = host
self.__port: int = port
self.__use_ssl: bool = use_ssl
self.__verify_certs: bool = verify_certs
@staticmethod
def get_instance():
if ElasticsearchClientInstanceGenerator.__instance is None:
ElasticsearchClientInstanceGenerator()
return ElasticsearchClientInstanceGenerator.__instance
Run Code Online (Sandbox Code Playgroud)
然后我创建了这个装饰器
def try_until_succeed(func):
def catch_authorization_exception(*args, **kwargs):
for i in range(0,10):
try:
data = func(*args, **kwargs)
return data
except AuthorizationException as ae:
ElasticsearchClientInstanceGenerator.get_instance().generate_elasticsearch_client()
raise Execption('Elasticsearch general exception')
return catch_authorization_exception
Run Code Online (Sandbox Code Playgroud)
我在需要与 Elasticcearch 连接的每个方法上使用装饰器。每当我收到AuthorizationException错误时,客户端凭据就会刷新,之后我就可以连接到 Elasticsearch。
| 归档时间: |
|
| 查看次数: |
1119 次 |
| 最近记录: |