使用Django OAuth2 Toolkit以编程方式生成访问令牌

Fel*_* D. 8 django oauth-2.0 python-social-auth

我正在使用Python Social Auth和Django OAuth Toolkit来管理我的用户帐户并限制对我的REST API的访问.

我可以为使用常规手动注册我的应用的用户创建令牌

curl -X POST -d "grant_type=password&username=<user_name>&password=<password>" -u"<client_id>:<client_secret>" http://localhost:8000/o/token/

但是,当我通过访问令牌向PSA注册我的用户时,我想为自己的应用创建一个OAuth2 Toolkit令牌,并将其作为JSON返回给客户端,以便它可以使用它来通过我的API发出请求.

目前,我生成令牌简单地使用generate_tokenoauthlib,是好的做法呢?我应该考虑其他因素吗?

from oauthlib.common import generate_token

...

@psa('social:complete')
def register_by_access_token(request, backend):
    # This view expects an access_token GET parameter, if it's needed,
    # request.backend and request.strategy will be loaded with the current
    # backend and strategy.
    token = request.GET.get('access_token')
    user = request.backend.do_auth(token)

    if user:
        login(request, user)
        app = Application.objects.get(name="myapp")

        # We delete the old one
        try:
            old = AccessToken.objects.get(user=user, application=app)
        except:
            pass
        else:
            old.delete()

        # We create a new one
        tok = generate_token()

        AccessToken.objects.get_or_create(user=user,
                                          application=app,
                                          expires=now() + timedelta(days=365),
                                          token=tok)

        return "OK" # I will eventually return JSON with the token
    else:
        return "ERROR"
Run Code Online (Sandbox Code Playgroud)

Dra*_*obZ 3

我最近使用https://github.com/PhilipGarnero/django-rest-framework-social-oauth2来实现此目的,就像用户Felix D.建议的那样。下面是我的实现:

class TokenHandler:
    application = Application.objects.filter(name=APPLICATION_NAME).values('client_id', 'client_secret')

    def handle_token(self, request):
        """
        Gets the latest token (to access my API) and if it's expired, check to see if the social token has expired.
        If the social token has expired, then the user must log back in to access the API. If it hasn't expired, 
        (my) token is refreshed.
        """
        try:
            token_list = AccessToken.objects.filter(user=request.user)\
                .order_by('-id').values('token', 'expires')
            if token_list[0]['expires'] < datetime.now(timezone.utc):
                if not self.social_token_is_expired(request):
                    token = self.refresh_token(request)
                else:
                    token = 'no_valid_token'
            else:
                token = token_list[0]['token']
        except IndexError:  # happens where there are no old tokens to check
            token = self.convert_social_token(request)
        except TypeError:  # happens when an anonymous user attempts to get a token for the API

            token = 'no_valid_token'
        return token

    def convert_social_token(self, request):
        grant_type = 'convert_token'
        client_id = self.application[0]['client_id']
        client_secret = self.application[0]['client_secret']
        try:
            user_social_auth = request.user.social_auth.filter(user=request.user).values('provider', 'extra_data')[0]
            backend = user_social_auth['provider']
            token = user_social_auth['extra_data']['access_token']
            url = get_base_url(request) + reverse('convert_token')
            fields = {'client_id': client_id, 'client_secret': client_secret, 'grant_type': grant_type,
                      'backend': backend,
                      'token': token}
            if backend == 'azuread-oauth2':
                fields['id_token'] = user_social_auth['extra_data']['id_token']
            response = requests.post(url, data=fields)
            response_dict = json.loads(response.text)
        except IndexError:
            return {'error': 'You must use an OAuth account to access the API.'}
        except UserSocialAuth.DoesNotExist:
            return {'error': 'You must use an OAuth account to access the API.'}
        return response_dict['access_token']

    def refresh_token(self, request):
        grant_type = 'refresh_token'
        client_id = self.application[0]['client_id']
        client_secret = self.application[0]['client_secret']
        try:
            refresh_token_object = RefreshToken.objects.filter(user=request.user).order_by('-id').values('token')[0]
            token = refresh_token_object['token']
            url = get_base_url(request) + reverse('token')
            fields = {'client_id': client_id, 'client_secret': client_secret, 'grant_type': grant_type,
                      'refresh_token': token}
            response = requests.post(url, data=fields)
            response_dict = json.loads(response.text)
        except RefreshToken.DoesNotExist:
            return {'error': 'You must use an OAuth account to access the API.'}

        return response_dict['access_token']

    @staticmethod
    def social_token_is_expired(request):
        user_social_auth = UserSocialAuth.objects.filter(user=request.user).values('provider', 'extra_data')[0]
        try:
            return float(user_social_auth['extra_data']['expires_on']) <= datetime.now().timestamp()
        except KeyError:  # social API did not provide an expiration
            return True  # if our token is expired and social API did not provide a time, we do consider them expired
Run Code Online (Sandbox Code Playgroud)