将 Keycloak 与 FastAPI 集成

Mao*_*ake 4 python oauth-2.0 openid-connect fastapi

我有一个用 FastAPI 和 SvelteKit 编写的应用程序,这两个应用程序都在单独的容器中运行,当前的解决方案是 Svelte 将用户名和密码发送到 FastAPI 服务器,然后 FastAPI 服务器将签名的 JWT 返回到 svelte,用于进行身份验证与 FastAPI。

登录流程如下:Svelte -> 用户名 + 密码 -> FastAPI

FastAPI -> JWT -> Svelte(存储在 cookie 中)

这是发出请求时发生的情况 Svelte(Authorization: Bearer) -> FastAPI

我想摆脱我自己的用户名和密码并使用 KeyCloak 进行身份验证。我对 OAuth 非常陌生,所以我不知道我应该做什么,甚至不知道要搜索什么术语。

这是我理解我想要的: Svelte(goto "/login") -> 重定向到 keycloak -> 登录并获取令牌(以某种方式将令牌获取到 FastAPI,以便我可以签署自己的令牌并将其发送到 svelte)

当我发出请求时 Svelte -> FastAPI Token -> FastAPI

ily*_*sAj 7

我为 fastAPI 与 keycloak 集成编写了一些Python 代码,分享它可能会有所帮助。

声明授权函数

#/auth.py
from fastapi.security import OAuth2AuthorizationCodeBearer
from keycloak import KeycloakOpenID # pip require python-keycloak
from config import settings
from fastapi import Security, HTTPException, status,Depends
from pydantic import Json
from models import User

# This is used for fastapi docs authentification
oauth2_scheme = OAuth2AuthorizationCodeBearer(
    authorizationUrl=settings.authorization_url, # https://sso.example.com/auth/
    tokenUrl=settings.token_url, # https://sso.example.com/auth/realms/example-realm/protocol/openid-connect/token
)

# This actually does the auth checks
# client_secret_key is not mandatory if the client is public on keycloak
keycloak_openid = KeycloakOpenID(
    server_url=settings.server_url, # https://sso.example.com/auth/
    client_id=settings.client_id, # backend-client-id
    realm_name=settings.realm, # example-realm
    client_secret_key=settings.client_secret, # your backend client secret
    verify=True
)

async def get_idp_public_key():
    return (
        "-----BEGIN PUBLIC KEY-----\n"
        f"{keycloak_openid.public_key()}"
        "\n-----END PUBLIC KEY-----"
    )

# Get the payload/token from keycloak
async def get_payload(token: str = Security(oauth2_scheme)) -> dict:
    try:
        return keycloak_openid.decode_token(
            token,
            key= await get_idp_public_key(),
            options={
                "verify_signature": True,
                "verify_aud": False,
                "exp": True
            }
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=str(e), # "Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
# Get user infos from the payload
async def get_user_info(payload: dict = Depends(get_payload)) -> User:
    try:
        return User(
            id=payload.get("sub"),
            username=payload.get("preferred_username"),
            email=payload.get("email"),
            first_name=payload.get("given_name"),
            last_name=payload.get("family_name"),
            realm_roles=payload.get("realm_access", {}).get("roles", []),
            client_roles=payload.get("realm_access", {}).get("roles", [])
        )
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e), # "Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

Run Code Online (Sandbox Code Playgroud)

定义您的模型,您可以使用您需要的任何模型:

#/models.py
from pydantic import BaseModel, EmailStr

class User(BaseModel):
    id: str
    username: str
    email: str
    first_name: str
    last_name: str
    realm_roles: list
    client_roles: list

class authConfiguration(BaseModel):
        server_url: str
        realm: str
        client_id: str
        client_secret: str
        authorization_url: str
        token_url: str
Run Code Online (Sandbox Code Playgroud)

定义你的配置:

#/config.py
from models import authConfiguration


settings = authConfiguration(
    server_url="http://localhost:8080/",
    realm="roc",
    client_id="rns:roc:portal",
    client_secret="",
    authorization_url="http://localhost:8080/realms/roc/protocol/openid-connect/auth",
    token_url="http://localhost:8080/realms/roc/protocol/openid-connect/token",
)

Run Code Online (Sandbox Code Playgroud)

最后定义您的路线并保护它们:

#/main.py
import uvicorn
from fastapi import FastAPI,Depends
from models import User
from auth import get_user_info

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

@app.get("/secure")
async def root(user: User = Depends(get_user_info)):
    return {"message": f"Hello {user.username} you have the following service: {user.realm_roles}"}


if __name__ == '__main__':
    uvicorn.run('main:app', host="127.0.0.1", port=8081)
Run Code Online (Sandbox Code Playgroud)

在 keycloak 端创建一个领域,并在这个领域内创建一个客户端。创建一个测试用户以使用它进行身份验证,仅此而已!

参考: