Kye*_*JmD 5 python amazon-web-services python-3.x amazon-cognito python-3.6
这是我的脚本
import urllib.request
import json
import time
from jose import jwk, jwt
from jose.utils import base64url_decode
import base64
region = '....'
userpool_id = '.....'
app_client_id = '...'
keys_url = 'https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json'.format(region, userpool_id)
response = urllib.request.urlopen(keys_url)
keys = json.loads(response.read())['keys']
token = request.headers['Authorization']
print(token)
# get the kid from the headers prior to verification
headers = jwt.get_unverified_headers(request.headers['Authorization'])
kid = headers['kid']
print(kid)
# search for the kid in the downloaded public keys
key_index = -1
for i in range(len(keys)):
if kid == keys[i]['kid']:
key_index = i
break
if key_index == -1:
print('Public key not found in jwks.json')
return False
# construct the public key
public_key = jwk.construct(keys[key_index])
# get the last two sections of the token,
# message and signature (encoded in base64)
message, encoded_signature = str(token).rsplit('.', 1)
# decode the
print('>>encoded signature')
print(encoded_signature)
decoded_signature = base64.b64decode(encoded_signature)
if not public_key.verify(message, decoded_signature):
print('Signature verification failed')
return False
print('Signature successfully verified')
Run Code Online (Sandbox Code Playgroud)
即使 jwt 令牌是由有效的合法认知用户池生成的,我总是以签名验证失败告终。我查看了文档,它并没有真正指定整个验证过程。
jcv*_*rde 12
我看到您正在使用 jose,而我正在使用 pyjwt,但这个解决方案可能对您有帮助。底部的大部分批量代码来自“api-gateway-authorizer-python”蓝图。请注意,这是非常脆弱的代码,如果出现任何问题就会中断,我最终没有使用 lambda 身份验证,而是为带有身份池的 API 网关选择了 AWS_IAM 身份验证,因此我从未完成它。
此示例要求您在工作目录中使用 pip 安装 pyjwt 和加密技术,并将所有内容上传为 .zip 文件。
如果您想考虑 AWS_IAM 身份验证选项,我建议您观看此视频:https://www.youtube.com/watch?v=VZqG7HjT2AQ
他们还在 github 上提供了一个更复杂的 lambda 授权器实现的解决方案,网址为:https://github.com/awslabs/aws-serverless-auth-reference-app(他们在视频开头显示了链接),但我不知道不知道他们的 pip 依赖性。
from __future__ import print_function
from jwt.algorithms import RSAAlgorithm
import re
import jwt
import json
import sys
import urllib
region = 'your-region'
userpoolId = 'your-user-pool-id'
appClientId = 'your-app-client-id'
keysUrl = 'https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json'.format(region, userpoolId)
def lambda_handler(event, context):
bearerToken = event['authorizationToken']
methodArn = event['methodArn']
print("Client token: " + bearerToken)
print("Method ARN: " + methodArn)
response = urllib.urlopen(keysUrl)
keys = json.loads(response.read())['keys']
jwtToken = bearerToken.split(' ')[-1]
header = jwt.get_unverified_header(jwtToken)
kid = header['kid']
jwkValue = findJwkValue(keys, kid)
publicKey = RSAAlgorithm.from_jwk(json.dumps(jwkValue))
decoded = decodeJwtToken(jwtToken, publicKey)
print('Decoded token: ' + json.dumps(decoded))
principalId = decoded['cognito:username']
methodArn = event['methodArn'].split(':')
apiGatewayArnTmp = methodArn[5].split('/')
awsAccountId = methodArn[4]
policy = AuthPolicy(principalId, awsAccountId)
policy.restApiId = apiGatewayArnTmp[0]
policy.region = methodArn[3]
policy.stage = apiGatewayArnTmp[1]
#policy.denyAllMethods()
policy.allowAllMethods()
# Finally, build the policy
authResponse = policy.build()
# new! -- add additional key-value pairs associated with the authenticated principal
# these are made available by APIGW like so: $context.authorizer.<key>
# additional context is cached
context = {
'key': 'value', # $context.authorizer.key -> value
'number': 1,
'bool': True
}
# context['arr'] = ['foo'] <- this is invalid, APIGW will not accept it
# context['obj'] = {'foo':'bar'} <- also invalid
authResponse['context'] = context
return authResponse
def findJwkValue(keys, kid):
for key in keys:
if key['kid'] == kid:
return key
def decodeJwtToken(token, publicKey):
try:
decoded=jwt.decode(token, publicKey, algorithms=['RS256'], audience=appClientId)
return decoded
except Exception as e:
print(e)
raise
class HttpVerb:
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
PATCH = 'PATCH'
HEAD = 'HEAD'
DELETE = 'DELETE'
OPTIONS = 'OPTIONS'
ALL = '*'
class AuthPolicy(object):
# The AWS account id the policy will be generated for. This is used to create the method ARNs.
awsAccountId = ''
# The principal used for the policy, this should be a unique identifier for the end user.
principalId = ''
# The policy version used for the evaluation. This should always be '2012-10-17'
version = '2012-10-17'
# The regular expression used to validate resource paths for the policy
pathRegex = '^[/.a-zA-Z0-9-\*]+$'
'''Internal lists of allowed and denied methods.
These are lists of objects and each object has 2 properties: A resource
ARN and a nullable conditions statement. The build method processes these
lists and generates the approriate statements for the final policy.
'''
allowMethods = []
denyMethods = []
# The API Gateway API id. By default this is set to '*'
restApiId = '*'
# The region where the API is deployed. By default this is set to '*'
region = '*'
# The name of the stage used in the policy. By default this is set to '*'
stage = '*'
def __init__(self, principal, awsAccountId):
self.awsAccountId = awsAccountId
self.principalId = principal
self.allowMethods = []
self.denyMethods = []
def _addMethod(self, effect, verb, resource, conditions):
'''Adds a method to the internal lists of allowed or denied methods. Each object in
the internal list contains a resource ARN and a condition statement. The condition
statement can be null.'''
if verb != '*' and not hasattr(HttpVerb, verb):
raise NameError('Invalid HTTP verb ' + verb + '. Allowed verbs in HttpVerb class')
resourcePattern = re.compile(self.pathRegex)
if not resourcePattern.match(resource):
raise NameError('Invalid resource path: ' + resource + '. Path should match ' + self.pathRegex)
if resource[:1] == '/':
resource = resource[1:]
resourceArn = 'arn:aws:execute-api:{}:{}:{}/{}/{}/{}'.format(self.region, self.awsAccountId, self.restApiId, self.stage, verb, resource)
if effect.lower() == 'allow':
self.allowMethods.append({
'resourceArn': resourceArn,
'conditions': conditions
})
elif effect.lower() == 'deny':
self.denyMethods.append({
'resourceArn': resourceArn,
'conditions': conditions
})
def _getEmptyStatement(self, effect):
'''Returns an empty statement object prepopulated with the correct action and the
desired effect.'''
statement = {
'Action': 'execute-api:Invoke',
'Effect': effect[:1].upper() + effect[1:].lower(),
'Resource': []
}
return statement
def _getStatementForEffect(self, effect, methods):
'''This function loops over an array of objects containing a resourceArn and
conditions statement and generates the array of statements for the policy.'''
statements = []
if len(methods) > 0:
statement = self._getEmptyStatement(effect)
for curMethod in methods:
if curMethod['conditions'] is None or len(curMethod['conditions']) == 0:
statement['Resource'].append(curMethod['resourceArn'])
else:
conditionalStatement = self._getEmptyStatement(effect)
conditionalStatement['Resource'].append(curMethod['resourceArn'])
conditionalStatement['Condition'] = curMethod['conditions']
statements.append(conditionalStatement)
if statement['Resource']:
statements.append(statement)
return statements
def allowAllMethods(self):
'''Adds a '*' allow to the policy to authorize access to all methods of an API'''
self._addMethod('Allow', HttpVerb.ALL, '*', [])
def denyAllMethods(self):
'''Adds a '*' allow to the policy to deny access to all methods of an API'''
self._addMethod('Deny', HttpVerb.ALL, '*', [])
def allowMethod(self, verb, resource):
'''Adds an API Gateway method (Http verb + Resource path) to the list of allowed
methods for the policy'''
self._addMethod('Allow', verb, resource, [])
def denyMethod(self, verb, resource):
'''Adds an API Gateway method (Http verb + Resource path) to the list of denied
methods for the policy'''
self._addMethod('Deny', verb, resource, [])
def allowMethodWithConditions(self, verb, resource, conditions):
'''Adds an API Gateway method (Http verb + Resource path) to the list of allowed
methods and includes a condition for the policy statement. More on AWS policy
conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition'''
self._addMethod('Allow', verb, resource, conditions)
def denyMethodWithConditions(self, verb, resource, conditions):
'''Adds an API Gateway method (Http verb + Resource path) to the list of denied
methods and includes a condition for the policy statement. More on AWS policy
conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition'''
self._addMethod('Deny', verb, resource, conditions)
def build(self):
'''Generates the policy document based on the internal lists of allowed and denied
conditions. This will generate a policy with two main statements for the effect:
one statement for Allow and one statement for Deny.
Methods that includes conditions will have their own statement in the policy.'''
if ((self.allowMethods is None or len(self.allowMethods) == 0) and
(self.denyMethods is None or len(self.denyMethods) == 0)):
raise NameError('No statements defined for the policy')
policy = {
'principalId': self.principalId,
'policyDocument': {
'Version': self.version,
'Statement': []
}
}
policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Allow', self.allowMethods))
policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Deny', self.denyMethods))
return policy
Run Code Online (Sandbox Code Playgroud)