Che*_*ery 1 python oauth-2.0 authlib
为了检索 OAuth / OpenID Connect 令牌,该函数在authlib 文档authorize_access_token中使用。像 Google 这样的 OAuth 提供商强烈建议手动验证这些令牌,例如通过检查到期日期。
文档在哪里authorize_access_token?我在网站上找不到任何东西。该函数会自动验证令牌还是我必须自己验证令牌?
您是正确的, authlib的文档对函数authorize_access_token()的作用进行了精简。文档的Web OAuth Client选择中有一些指示,但该部分仅提到了authorize_access_token()的返回值。
\n# this only returns a token\ntoken = oauth.providername.authorize_access_token()\nRun Code Online (Sandbox Code Playgroud)\n为了充分理解authorize_access_token()的功能,我们需要探索authlib\xe2\x80\x99s GitHub 存储库。
\n下面是django客户端代码中发出OAuth 2请求的一段代码。
\ndef authorize_access_token(self, request, **kwargs):\n """Fetch access token in one step.\n :param request: HTTP request instance from Django view.\n :return: A token dict.\n """\n if request.method == 'GET':\n error = request.GET.get('error')\n if error:\n description = request.GET.get('error_description')\n raise OAuthError(error=error, description=description)\n params = {\n 'code': request.GET.get('code'),\n 'state': request.GET.get('state'),\n }\n else:\n params = {\n 'code': request.POST.get('code'),\n 'state': request.POST.get('state'),\n }\n\n state_data = self.framework.get_state_data(request.session, params.get('state'))\n params = self._format_state_params(state_data, params)\n token = self.fetch_access_token(**params, **kwargs)\n\n if 'id_token' in token and 'nonce' in state_data:\n userinfo = self.parse_id_token(token, nonce=state_data['nonce'])\n token['userinfo'] = userinfo\n return token\n\nRun Code Online (Sandbox Code Playgroud)\n在上面的authorize_access_token()代码中我们有这样的调用:
\nerror = request.GET.get('error')\nRun Code Online (Sandbox Code Playgroud)\n该调用访问base_client代码段。其中有这些验证:
\nbase_client代码中的子调用之一是这样的:
\n def _on_update_token(self, token, refresh_token=None, access_token=None):\n raise NotImplementedError()\nRun Code Online (Sandbox Code Playgroud)\n如果出现问题,authorize_access_token()将抛出错误:
\nif error:\n description = request.GET.get('error_description')\n raise OAuthError(error=error, description=description)\nRun Code Online (Sandbox Code Playgroud)\nauthorize_access_token()函数中的每个调用也使用base_client代码执行各种类型的检查,该代码非常广泛。
\nstate_data = self.framework.get_state_data(request.session, params.get('state'))\nparams = self._format_state_params(state_data, params)\ntoken = self.fetch_access_token(**params, **kwargs)\nRun Code Online (Sandbox Code Playgroud)\n第一次调用state_data调用这个函数:
\ndef get_state_data(self, session, state):\n key = f'_state_{self.name}_{state}'\n if self.cache:\n value = self._get_cache_data(key)\n else:\n value = session.get(key)\n if value:\n return value.get('data')\n return None\n\nRun Code Online (Sandbox Code Playgroud)\n第二次调用params调用此函数:
\n def _format_state_params(state_data, params):\n if state_data is None:\n raise MismatchingStateError()\n\n code_verifier = state_data.get('code_verifier')\n if code_verifier:\n params['code_verifier'] = code_verifier\n\n redirect_uri = state_data.get('redirect_uri')\n if redirect_uri:\n params['redirect_uri'] = redirect_uri\n return params\nRun Code Online (Sandbox Code Playgroud)\n第三个调用令牌调用此函数:
\ndef fetch_access_token(self, request_token=None, **kwargs):\n """Fetch access token in one step.\n :param request_token: A previous request token for OAuth 1.\n :param kwargs: Extra parameters to fetch access token.\n :return: A token dict.\n """\n with self._get_oauth_client() as client:\n if request_token is None:\n raise MissingRequestTokenError()\n # merge request token with verifier\n token = {}\n token.update(request_token)\n token.update(kwargs)\n client.token = token\n params = self.access_token_params or {}\n token = client.fetch_access_token(self.access_token_url, **params)\n return token\nRun Code Online (Sandbox Code Playgroud)\nAuthorize_access_token()函数中的最后一个调用是 parse_id_token ,它在base_client代码中调用此函数:
\nasync def parse_id_token(self, token, nonce, claims_options=None):\n """Return an instance of UserInfo from token's ``id_token``."""\n claims_params = dict(\n nonce=nonce,\n client_id=self.client_id,\n )\n if 'access_token' in token:\n claims_params['access_token'] = token['access_token']\n claims_cls = CodeIDToken\n else:\n claims_cls = ImplicitIDToken\n\n metadata = await self.load_server_metadata()\n if claims_options is None and 'issuer' in metadata:\n claims_options = {'iss': {'values': [metadata['issuer']]}}\n\n alg_values = metadata.get('id_token_signing_alg_values_supported')\n if not alg_values:\n alg_values = ['RS256']\n\n jwt = JsonWebToken(alg_values)\n\n jwk_set = await self.fetch_jwk_set()\n try:\n claims = jwt.decode(\n token['id_token'],\n key=JsonWebKey.import_key_set(jwk_set),\n claims_cls=claims_cls,\n claims_options=claims_options,\n claims_params=claims_params,\n )\n except ValueError:\n jwk_set = await self.fetch_jwk_set(force=True)\n claims = jwt.decode(\n token['id_token'],\n key=JsonWebKey.import_key_set(jwk_set),\n claims_cls=claims_cls,\n claims_options=claims_options,\n claims_params=claims_params,\n )\n\n # https://github.com/lepture/authlib/issues/259\n if claims.get('nonce_supported') is False:\n claims.params['nonce'] = None\n claims.validate(leeway=120)\n return UserInfo(claims)\nRun Code Online (Sandbox Code Playgroud)\n在上述所有函数中, base_client代码中都调用了其他函数。例如,这里是从authorize_access_token()函数访问的代码的另一部分:
\nfrom requests import Session\nfrom requests.auth import AuthBase\nfrom authlib.oauth2.client import OAuth2Client\nfrom authlib.oauth2.auth import ClientAuth, TokenAuth\nfrom ..base_client import (\n OAuthError,\n InvalidTokenError,\n MissingTokenError,\n UnsupportedTokenTypeError,\n)\nfrom .utils import update_session_configure\n\n__all__ = ['OAuth2Session', 'OAuth2Auth']\n\n\nclass OAuth2Auth(AuthBase, TokenAuth):\n """Sign requests for OAuth 2.0, currently only bearer token is supported."""\n\n def ensure_active_token(self):\n if self.client and not self.client.ensure_active_token(self.token):\n raise InvalidTokenError()\n\n def __call__(self, req):\n self.ensure_active_token()\n try:\n req.url, req.headers, req.body = self.prepare(\n req.url, req.headers, req.body)\n except KeyError as error:\n description = 'Unsupported token_type: {}'.format(str(error))\n raise UnsupportedTokenTypeError(description=description)\n return req\n\n\nclass OAuth2ClientAuth(AuthBase, ClientAuth):\n """Attaches OAuth Client Authentication to the given Request object.\n """\n def __call__(self, req):\n req.url, req.headers, req.body = self.prepare(\n req.method, req.url, req.headers, req.body\n )\n return req\n\n\nclass OAuth2Session(OAuth2Client, Session):\n """Construct a new OAuth 2 client requests session.\n :param client_id: Client ID, which you get from client registration.\n :param client_secret: Client Secret, which you get from registration.\n :param authorization_endpoint: URL of the authorization server's\n authorization endpoint.\n :param token_endpoint: URL of the authorization server's token endpoint.\n :param token_endpoint_auth_method: client authentication method for\n token endpoint.\n :param revocation_endpoint: URL of the authorization server's OAuth 2.0\n revocation endpoint.\n :param revocation_endpoint_auth_method: client authentication method for\n revocation endpoint.\n :param scope: Scope that you needed to access user resources.\n :param redirect_uri: Redirect URI you registered as callback.\n :param token: A dict of token attributes such as ``access_token``,\n ``token_type`` and ``expires_at``.\n :param token_placement: The place to put token in HTTP request. Available\n values: "header", "body", "uri".\n :param update_token: A function for you to update token. It accept a\n :class:`OAuth2Token` as parameter.\n """\n client_auth_class = OAuth2ClientAuth\n token_auth_class = OAuth2Auth\n SESSION_REQUEST_PARAMS = (\n 'allow_redirects', 'timeout', 'cookies', 'files',\n 'proxies', 'hooks', 'stream', 'verify', 'cert', 'json'\n )\n\n def __init__(self, client_id=None, client_secret=None,\n token_endpoint_auth_method=None,\n revocation_endpoint_auth_method=None,\n scope=None, redirect_uri=None,\n token=None, token_placement='header',\n update_token=None, **kwargs):\n\n Session.__init__(self)\n update_session_configure(self, kwargs)\n\n OAuth2Client.__init__(\n self, session=self,\n client_id=client_id, client_secret=client_secret,\n token_endpoint_auth_method=token_endpoint_auth_method,\n revocation_endpoint_auth_method=revocation_endpoint_auth_method,\n scope=scope, redirect_uri=redirect_uri,\n token=token, token_placement=token_placement,\n update_token=update_token, **kwargs\n )\n\n def fetch_access_token(self, url=None, **kwargs):\n """Alias for fetch_token."""\n return self.fetch_token(url, **kwargs)\n\n def request(self, method, url, withhold_token=False, auth=None, **kwargs):\n """Send request with auto refresh token feature (if available)."""\n if not withhold_token and auth is None:\n if not self.token:\n raise MissingTokenError()\n auth = self.token_auth\n return super(OAuth2Session, self).request(\n method, url, auth=auth, **kwargs)\n\n @staticmethod\n def handle_error(error_type, error_description):\n raise OAuthError(error_type, error_description)\n\nRun Code Online (Sandbox Code Playgroud)\n以下是与验证相关的声明:
\nREGISTERED_CLAIMS = ['iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti']\n\nREGISTERED_CLAIMS = [\n 'redirect_uris',\n 'token_endpoint_auth_method',\n 'grant_types',\n 'response_types',\n 'client_name',\n 'client_uri',\n 'logo_uri',\n 'scope',\n 'contacts',\n 'tos_uri',\n 'policy_uri',\n 'jwks_uri',\n 'jwks',\n 'software_id',\n 'software_version',\n ]\nRun Code Online (Sandbox Code Playgroud)\n在 OAuth/OpenID 过程的每个阶段,authlib都会执行多次检查来验证令牌。
\n我希望我的回答可以帮助您更好地理解函数authorize_access_token()。
\n快乐编码!
\n