如何在 Python 3.6 中验证 AWS Cognito 生成的 JWT 签名?

Kye*_*JmD 5 python amazon-web-services python-3.x amazon-cognito python-3.6

这是我的脚本

import urllib.request
import json
import time
from jose import jwk, jwt
from jose.utils import base64url_decode
import base64


region = '....'
userpool_id = '.....'
app_client_id = '...' 
keys_url = 'https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json'.format(region, userpool_id)

response = urllib.request.urlopen(keys_url)
keys = json.loads(response.read())['keys']
token = request.headers['Authorization']
print(token)
# get the kid from the headers prior to verification
headers = jwt.get_unverified_headers(request.headers['Authorization'])
kid = headers['kid']

print(kid)
# search for the kid in the downloaded public keys
key_index = -1
for i in range(len(keys)):
    if kid == keys[i]['kid']:
        key_index = i
        break
if key_index == -1:
    print('Public key not found in jwks.json')
    return False
# construct the public key
public_key = jwk.construct(keys[key_index])
# get the last two sections of the token,
# message and signature (encoded in base64)
message, encoded_signature = str(token).rsplit('.', 1)
# decode the 
print('>>encoded signature')
print(encoded_signature)
decoded_signature = base64.b64decode(encoded_signature)
if not public_key.verify(message, decoded_signature):
    print('Signature verification failed')
    return False
print('Signature successfully verified')
Run Code Online (Sandbox Code Playgroud)

即使 jwt 令牌是由有效的合法认知用户池生成的,我总是以签名验证失败告终。我查看了文档,它并没有真正指定整个验证过程。

jcv*_*rde 12

我看到您正在使用 jose,而我正在使用 pyjwt,但这个解决方案可能对您有帮助。底部的大部分批量代码来自“api-gateway-authorizer-python”蓝图。请注意,这是非常脆弱的代码,如果出现任何问题就会中断,我最终没有使用 lambda 身份验证,而是为带有身份池的 API 网关选择了 AWS_IAM 身份验证,因此我从未完成它。

此示例要求您在工作目录中使用 pip 安装 pyjwt 和加密技术,并将所有内容上传为 .zip 文件。

如果您想考虑 AWS_IAM 身份验证选项,我建议您观看此视频:https://www.youtube.com/watch?v=VZqG7HjT2AQ

他们还在 github 上提供了一个更复杂的 lambda 授权器实现的解决方案,网址为:https://github.com/awslabs/aws-serverless-auth-reference-app(他们在视频开头显示了链接),但我不知道不知道他们的 pip 依赖性。

from __future__ import print_function
from jwt.algorithms import RSAAlgorithm

import re
import jwt
import json
import sys
import urllib

region = 'your-region'
userpoolId = 'your-user-pool-id'
appClientId = 'your-app-client-id' 
keysUrl = 'https://cognito-idp.{}.amazonaws.com/{}/.well-known/jwks.json'.format(region, userpoolId)

def lambda_handler(event, context):
    bearerToken = event['authorizationToken']
    methodArn = event['methodArn']
    print("Client token: " + bearerToken)
    print("Method ARN: " + methodArn)

    response = urllib.urlopen(keysUrl)
    keys = json.loads(response.read())['keys']

    jwtToken = bearerToken.split(' ')[-1]
    header = jwt.get_unverified_header(jwtToken)
    kid = header['kid']

    jwkValue = findJwkValue(keys, kid)
    publicKey = RSAAlgorithm.from_jwk(json.dumps(jwkValue))

    decoded = decodeJwtToken(jwtToken, publicKey)
    print('Decoded token: ' + json.dumps(decoded))
    principalId = decoded['cognito:username']

    methodArn = event['methodArn'].split(':')
    apiGatewayArnTmp = methodArn[5].split('/')
    awsAccountId = methodArn[4]

    policy = AuthPolicy(principalId, awsAccountId)
    policy.restApiId = apiGatewayArnTmp[0]
    policy.region = methodArn[3]
    policy.stage = apiGatewayArnTmp[1]
    #policy.denyAllMethods()
    policy.allowAllMethods()

    # Finally, build the policy
    authResponse = policy.build()

    # new! -- add additional key-value pairs associated with the authenticated principal
    # these are made available by APIGW like so: $context.authorizer.<key>
    # additional context is cached
    context = {
        'key': 'value',  # $context.authorizer.key -> value
        'number': 1,
        'bool': True
    }
    # context['arr'] = ['foo'] <- this is invalid, APIGW will not accept it
    # context['obj'] = {'foo':'bar'} <- also invalid

    authResponse['context'] = context

    return authResponse

def findJwkValue(keys, kid):
    for key in keys:
        if key['kid'] == kid:
            return key

def decodeJwtToken(token, publicKey):
    try:
        decoded=jwt.decode(token, publicKey, algorithms=['RS256'], audience=appClientId)
        return decoded
    except Exception as e:
        print(e)
        raise

class HttpVerb:
    GET = 'GET'
    POST = 'POST'
    PUT = 'PUT'
    PATCH = 'PATCH'
    HEAD = 'HEAD'
    DELETE = 'DELETE'
    OPTIONS = 'OPTIONS'
    ALL = '*'


class AuthPolicy(object):
    # The AWS account id the policy will be generated for. This is used to create the method ARNs.
    awsAccountId = ''
    # The principal used for the policy, this should be a unique identifier for the end user.
    principalId = ''
    # The policy version used for the evaluation. This should always be '2012-10-17'
    version = '2012-10-17'
    # The regular expression used to validate resource paths for the policy
    pathRegex = '^[/.a-zA-Z0-9-\*]+$'

    '''Internal lists of allowed and denied methods.

    These are lists of objects and each object has 2 properties: A resource
    ARN and a nullable conditions statement. The build method processes these
    lists and generates the approriate statements for the final policy.
    '''
    allowMethods = []
    denyMethods = []

    # The API Gateway API id. By default this is set to '*'
    restApiId = '*'
    # The region where the API is deployed. By default this is set to '*'
    region = '*'
    # The name of the stage used in the policy. By default this is set to '*'
    stage = '*'

    def __init__(self, principal, awsAccountId):
        self.awsAccountId = awsAccountId
        self.principalId = principal
        self.allowMethods = []
        self.denyMethods = []

    def _addMethod(self, effect, verb, resource, conditions):
        '''Adds a method to the internal lists of allowed or denied methods. Each object in
        the internal list contains a resource ARN and a condition statement. The condition
        statement can be null.'''
        if verb != '*' and not hasattr(HttpVerb, verb):
            raise NameError('Invalid HTTP verb ' + verb + '. Allowed verbs in HttpVerb class')
        resourcePattern = re.compile(self.pathRegex)
        if not resourcePattern.match(resource):
            raise NameError('Invalid resource path: ' + resource + '. Path should match ' + self.pathRegex)

        if resource[:1] == '/':
            resource = resource[1:]

        resourceArn = 'arn:aws:execute-api:{}:{}:{}/{}/{}/{}'.format(self.region, self.awsAccountId, self.restApiId, self.stage, verb, resource)

        if effect.lower() == 'allow':
            self.allowMethods.append({
                'resourceArn': resourceArn,
                'conditions': conditions
            })
        elif effect.lower() == 'deny':
            self.denyMethods.append({
                'resourceArn': resourceArn,
                'conditions': conditions
            })

    def _getEmptyStatement(self, effect):
        '''Returns an empty statement object prepopulated with the correct action and the
        desired effect.'''
        statement = {
            'Action': 'execute-api:Invoke',
            'Effect': effect[:1].upper() + effect[1:].lower(),
            'Resource': []
        }

        return statement

    def _getStatementForEffect(self, effect, methods):
        '''This function loops over an array of objects containing a resourceArn and
        conditions statement and generates the array of statements for the policy.'''
        statements = []

        if len(methods) > 0:
            statement = self._getEmptyStatement(effect)

            for curMethod in methods:
                if curMethod['conditions'] is None or len(curMethod['conditions']) == 0:
                    statement['Resource'].append(curMethod['resourceArn'])
                else:
                    conditionalStatement = self._getEmptyStatement(effect)
                    conditionalStatement['Resource'].append(curMethod['resourceArn'])
                    conditionalStatement['Condition'] = curMethod['conditions']
                    statements.append(conditionalStatement)

            if statement['Resource']:
                statements.append(statement)

        return statements

    def allowAllMethods(self):
        '''Adds a '*' allow to the policy to authorize access to all methods of an API'''
        self._addMethod('Allow', HttpVerb.ALL, '*', [])

    def denyAllMethods(self):
        '''Adds a '*' allow to the policy to deny access to all methods of an API'''
        self._addMethod('Deny', HttpVerb.ALL, '*', [])

    def allowMethod(self, verb, resource):
        '''Adds an API Gateway method (Http verb + Resource path) to the list of allowed
        methods for the policy'''
        self._addMethod('Allow', verb, resource, [])

    def denyMethod(self, verb, resource):
        '''Adds an API Gateway method (Http verb + Resource path) to the list of denied
        methods for the policy'''
        self._addMethod('Deny', verb, resource, [])

    def allowMethodWithConditions(self, verb, resource, conditions):
        '''Adds an API Gateway method (Http verb + Resource path) to the list of allowed
        methods and includes a condition for the policy statement. More on AWS policy
        conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition'''
        self._addMethod('Allow', verb, resource, conditions)

    def denyMethodWithConditions(self, verb, resource, conditions):
        '''Adds an API Gateway method (Http verb + Resource path) to the list of denied
        methods and includes a condition for the policy statement. More on AWS policy
        conditions here: http://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition'''
        self._addMethod('Deny', verb, resource, conditions)

    def build(self):
        '''Generates the policy document based on the internal lists of allowed and denied
        conditions. This will generate a policy with two main statements for the effect:
        one statement for Allow and one statement for Deny.
        Methods that includes conditions will have their own statement in the policy.'''
        if ((self.allowMethods is None or len(self.allowMethods) == 0) and
                (self.denyMethods is None or len(self.denyMethods) == 0)):
            raise NameError('No statements defined for the policy')

        policy = {
            'principalId': self.principalId,
            'policyDocument': {
                'Version': self.version,
                'Statement': []
            }
        }

        policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Allow', self.allowMethods))
        policy['policyDocument']['Statement'].extend(self._getStatementForEffect('Deny', self.denyMethods))

        return policy
Run Code Online (Sandbox Code Playgroud)