authorize_access_token 是否也验证 id 令牌?

Che*_*ery 1 python oauth-2.0 authlib

为了检索 OAuth / OpenID Connect 令牌,该函数在authlib 文档authorize_access_token中使用。像 Google 这样的 OAuth 提供商强烈建议手动验证这些令牌,例如通过检查到期日期。

文档在哪里authorize_access_token?我在网站上找不到任何东西。该函数会自动验证令牌还是我必须自己验证令牌?

Lif*_*lex 5

您是正确的, authlib的文档对函数authorize_access_token()的作用进行了精简。文档的Web OAuth Client选择中有一些指示,但该部分仅提到了authorize_access_token()的返回值。

\n
# this only returns a token\ntoken = oauth.providername.authorize_access_token()\n
Run Code Online (Sandbox Code Playgroud)\n

为了充分理解authorize_access_token()的功能,我们需要探索authlib\xe2\x80\x99s GitHub 存储库。

\n

下面是django客户端代码中发出OAuth 2请求的一段代码。

\n
def authorize_access_token(self, request, **kwargs):\n        """Fetch access token in one step.\n        :param request: HTTP request instance from Django view.\n        :return: A token dict.\n        """\n        if request.method == 'GET':\n            error = request.GET.get('error')\n            if error:\n                description = request.GET.get('error_description')\n                raise OAuthError(error=error, description=description)\n            params = {\n                'code': request.GET.get('code'),\n                'state': request.GET.get('state'),\n            }\n        else:\n            params = {\n                'code': request.POST.get('code'),\n                'state': request.POST.get('state'),\n            }\n\n        state_data = self.framework.get_state_data(request.session, params.get('state'))\n        params = self._format_state_params(state_data, params)\n        token = self.fetch_access_token(**params, **kwargs)\n\n        if 'id_token' in token and 'nonce' in state_data:\n            userinfo = self.parse_id_token(token, nonce=state_data['nonce'])\n            token['userinfo'] = userinfo\n        return token\n\n
Run Code Online (Sandbox Code Playgroud)\n

在上面的authorize_access_token()代码中我们有这样的调用:

\n
error = request.GET.get('error')\n
Run Code Online (Sandbox Code Playgroud)\n

该调用访问base_client代码段。其中有这些验证:

\n
    \n
  • 不匹配状态错误
  • \n
  • 缺少请求令牌错误
  • \n
  • 缺少令牌错误
  • \n
\n

base_client代码中的子调用之一是这样的:

\n
 def _on_update_token(self, token, refresh_token=None, access_token=None):\n raise NotImplementedError()\n
Run Code Online (Sandbox Code Playgroud)\n

如果出现问题,authorize_access_token()将抛出错误:

\n
if error:\n   description = request.GET.get('error_description')\n   raise OAuthError(error=error, description=description)\n
Run Code Online (Sandbox Code Playgroud)\n

authorize_access_token()函数中的每个调用也使用base_client代码执行各种类型的检查,该代码非常广泛。

\n
state_data = self.framework.get_state_data(request.session, params.get('state'))\nparams = self._format_state_params(state_data, params)\ntoken = self.fetch_access_token(**params, **kwargs)\n
Run Code Online (Sandbox Code Playgroud)\n

第一次调用state_data调用这个函数:

\n
def get_state_data(self, session, state):\n        key = f'_state_{self.name}_{state}'\n        if self.cache:\n            value = self._get_cache_data(key)\n        else:\n            value = session.get(key)\n        if value:\n            return value.get('data')\n        return None\n\n
Run Code Online (Sandbox Code Playgroud)\n

第二次调用params调用此函数:

\n
 def _format_state_params(state_data, params):\n        if state_data is None:\n            raise MismatchingStateError()\n\n        code_verifier = state_data.get('code_verifier')\n        if code_verifier:\n            params['code_verifier'] = code_verifier\n\n        redirect_uri = state_data.get('redirect_uri')\n        if redirect_uri:\n            params['redirect_uri'] = redirect_uri\n        return params\n
Run Code Online (Sandbox Code Playgroud)\n

第三个调用令牌调用此函数:

\n
def fetch_access_token(self, request_token=None, **kwargs):\n        """Fetch access token in one step.\n        :param request_token: A previous request token for OAuth 1.\n        :param kwargs: Extra parameters to fetch access token.\n        :return: A token dict.\n        """\n        with self._get_oauth_client() as client:\n            if request_token is None:\n                raise MissingRequestTokenError()\n            # merge request token with verifier\n            token = {}\n            token.update(request_token)\n            token.update(kwargs)\n            client.token = token\n            params = self.access_token_params or {}\n            token = client.fetch_access_token(self.access_token_url, **params)\n        return token\n
Run Code Online (Sandbox Code Playgroud)\n

Authorize_access_token()函数中的最后一个调用是 parse_id_token ,它在base_client代码中调用此函数:

\n
async def parse_id_token(self, token, nonce, claims_options=None):\n        """Return an instance of UserInfo from token's ``id_token``."""\n        claims_params = dict(\n            nonce=nonce,\n            client_id=self.client_id,\n        )\n        if 'access_token' in token:\n            claims_params['access_token'] = token['access_token']\n            claims_cls = CodeIDToken\n        else:\n            claims_cls = ImplicitIDToken\n\n        metadata = await self.load_server_metadata()\n        if claims_options is None and 'issuer' in metadata:\n            claims_options = {'iss': {'values': [metadata['issuer']]}}\n\n        alg_values = metadata.get('id_token_signing_alg_values_supported')\n        if not alg_values:\n            alg_values = ['RS256']\n\n        jwt = JsonWebToken(alg_values)\n\n        jwk_set = await self.fetch_jwk_set()\n        try:\n            claims = jwt.decode(\n                token['id_token'],\n                key=JsonWebKey.import_key_set(jwk_set),\n                claims_cls=claims_cls,\n                claims_options=claims_options,\n                claims_params=claims_params,\n            )\n        except ValueError:\n            jwk_set = await self.fetch_jwk_set(force=True)\n            claims = jwt.decode(\n                token['id_token'],\n                key=JsonWebKey.import_key_set(jwk_set),\n                claims_cls=claims_cls,\n                claims_options=claims_options,\n                claims_params=claims_params,\n            )\n\n        # https://github.com/lepture/authlib/issues/259\n        if claims.get('nonce_supported') is False:\n            claims.params['nonce'] = None\n        claims.validate(leeway=120)\n        return UserInfo(claims)\n
Run Code Online (Sandbox Code Playgroud)\n

在上述所有函数中, base_client代码中都调用了其他函数。例如,这里是从authorize_access_token()函数访问的代码的另一部分:

\n
from requests import Session\nfrom requests.auth import AuthBase\nfrom authlib.oauth2.client import OAuth2Client\nfrom authlib.oauth2.auth import ClientAuth, TokenAuth\nfrom ..base_client import (\n    OAuthError,\n    InvalidTokenError,\n    MissingTokenError,\n    UnsupportedTokenTypeError,\n)\nfrom .utils import update_session_configure\n\n__all__ = ['OAuth2Session', 'OAuth2Auth']\n\n\nclass OAuth2Auth(AuthBase, TokenAuth):\n    """Sign requests for OAuth 2.0, currently only bearer token is supported."""\n\n    def ensure_active_token(self):\n        if self.client and not self.client.ensure_active_token(self.token):\n            raise InvalidTokenError()\n\n    def __call__(self, req):\n        self.ensure_active_token()\n        try:\n            req.url, req.headers, req.body = self.prepare(\n                req.url, req.headers, req.body)\n        except KeyError as error:\n            description = 'Unsupported token_type: {}'.format(str(error))\n            raise UnsupportedTokenTypeError(description=description)\n        return req\n\n\nclass OAuth2ClientAuth(AuthBase, ClientAuth):\n    """Attaches OAuth Client Authentication to the given Request object.\n    """\n    def __call__(self, req):\n        req.url, req.headers, req.body = self.prepare(\n            req.method, req.url, req.headers, req.body\n        )\n        return req\n\n\nclass OAuth2Session(OAuth2Client, Session):\n    """Construct a new OAuth 2 client requests session.\n    :param client_id: Client ID, which you get from client registration.\n    :param client_secret: Client Secret, which you get from registration.\n    :param authorization_endpoint: URL of the authorization server's\n        authorization endpoint.\n    :param token_endpoint: URL of the authorization server's token endpoint.\n    :param token_endpoint_auth_method: client authentication method for\n        token endpoint.\n    :param revocation_endpoint: URL of the authorization server's OAuth 2.0\n        revocation endpoint.\n    :param revocation_endpoint_auth_method: client authentication method for\n        revocation endpoint.\n    :param scope: Scope that you needed to access user resources.\n    :param redirect_uri: Redirect URI you registered as callback.\n    :param token: A dict of token attributes such as ``access_token``,\n        ``token_type`` and ``expires_at``.\n    :param token_placement: The place to put token in HTTP request. Available\n        values: "header", "body", "uri".\n    :param update_token: A function for you to update token. It accept a\n        :class:`OAuth2Token` as parameter.\n    """\n    client_auth_class = OAuth2ClientAuth\n    token_auth_class = OAuth2Auth\n    SESSION_REQUEST_PARAMS = (\n        'allow_redirects', 'timeout', 'cookies', 'files',\n        'proxies', 'hooks', 'stream', 'verify', 'cert', 'json'\n    )\n\n    def __init__(self, client_id=None, client_secret=None,\n                 token_endpoint_auth_method=None,\n                 revocation_endpoint_auth_method=None,\n                 scope=None, redirect_uri=None,\n                 token=None, token_placement='header',\n                 update_token=None, **kwargs):\n\n        Session.__init__(self)\n        update_session_configure(self, kwargs)\n\n        OAuth2Client.__init__(\n            self, session=self,\n            client_id=client_id, client_secret=client_secret,\n            token_endpoint_auth_method=token_endpoint_auth_method,\n            revocation_endpoint_auth_method=revocation_endpoint_auth_method,\n            scope=scope, redirect_uri=redirect_uri,\n            token=token, token_placement=token_placement,\n            update_token=update_token, **kwargs\n        )\n\n    def fetch_access_token(self, url=None, **kwargs):\n        """Alias for fetch_token."""\n        return self.fetch_token(url, **kwargs)\n\n    def request(self, method, url, withhold_token=False, auth=None, **kwargs):\n        """Send request with auto refresh token feature (if available)."""\n        if not withhold_token and auth is None:\n            if not self.token:\n                raise MissingTokenError()\n            auth = self.token_auth\n        return super(OAuth2Session, self).request(\n            method, url, auth=auth, **kwargs)\n\n    @staticmethod\n    def handle_error(error_type, error_description):\n        raise OAuthError(error_type, error_description)\n\n
Run Code Online (Sandbox Code Playgroud)\n

以下是与验证相关的声明:

\n
REGISTERED_CLAIMS = ['iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti']\n\nREGISTERED_CLAIMS = [\n        'redirect_uris',\n        'token_endpoint_auth_method',\n        'grant_types',\n        'response_types',\n        'client_name',\n        'client_uri',\n        'logo_uri',\n        'scope',\n        'contacts',\n        'tos_uri',\n        'policy_uri',\n        'jwks_uri',\n        'jwks',\n        'software_id',\n        'software_version',\n    ]\n
Run Code Online (Sandbox Code Playgroud)\n

在 OAuth/OpenID 过程的每个阶段,authlib都会执行多次检查来验证令牌。

\n

我希望我的回答可以帮助您更好地理解函数authorize_access_token()

\n

快乐编码!

\n