现在位置: 首页 > FastAPI 教程 > 正文

FastAPI 安全认证

FastAPI 内置了多种安全工具,支持 OAuth2、JWT 令牌、API Key 等常见的认证和授权方式。本节介绍如何实现基于 OAuth2 + JWT 的用户认证。


安全认证概述

FastAPI 支持的安全方案:

方案适用场景说明
HTTP Basic Auth简单内部服务用户名密码编码在请求头中,安全性较低
API Key服务间调用通过请求头、查询参数或 Cookie 传递密钥
OAuth2 + JWT前后端分离应用最常用的方案,安全且灵活

OAuth2 密码模式 + JWT

这是前后端分离应用中最常用的认证方案。流程:

  1. 客户端发送用户名和密码到 /token 端点
  2. 服务器验证凭据,返回 JWT 访问令牌
  3. 客户端在后续请求中携带令牌(Authorization: Bearer <token>
  4. 服务器验证令牌,识别用户身份

1. 安装依赖

pip install "python-jose[cryptography]" passlib[bcrypt]

2. 完整示例

实例

from datetime import datetime, timedelta
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# ===== 配置 =====
SECRET_KEY = "your-secret-key-keep-it-secret"  # 生产环境使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# ===== 密码哈希 =====
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# ===== OAuth2 方案 =====
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# ===== 数据模型 =====
class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str


# ===== 模拟数据库 =====
fake_users_db = {
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": pwd_context.hash("secret"),  # 密码: secret
        "disabled": False,
    }
}


# ===== 工具函数 =====
def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password: str) -> str:
    """生成密码哈希"""
    return pwd_context.hash(password)


def get_user(db: dict, username: str) -> UserInDB | None:
    """从数据库获取用户"""
    if username in db:
        return UserInDB(**db[username])
    return None


def authenticate_user(db: dict, username: str, password: str):
    """验证用户凭据"""
    user = get_user(db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """创建 JWT 访问令牌"""
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    """从令牌中获取当前用户(依赖函数)"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无法验证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    user = get_user(fake_users_db, username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
):
    """获取当前活跃用户"""
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="用户已被禁用")
    return current_user


# ===== 路由 =====
app = FastAPI()


@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    """登录获取令牌"""
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me")
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    """获取当前用户信息(需要认证)"""
    return current_user


@app.get("/users/me/items")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    """获取当前用户的条目(需要认证)"""
    return [{"item_id": "Foo", "owner": current_user.username}]

代码解析

密码哈希

使用 passlib 的 bcrypt 算法对密码进行哈希处理,确保数据库中不存储明文密码:

函数说明
pwd_context.hash(password)将明文密码转为哈希值
pwd_context.verify(plain, hashed)验证明文密码是否匹配哈希值

JWT 令牌

JWT(JSON Web Token)是一种安全的令牌格式,包含用户信息和过期时间:

操作函数说明
创建令牌jwt.encode()将数据编码为 JWT 字符串
解析令牌jwt.decode()解析并验证 JWT 字符串
设置过期"exp": expire令牌的过期时间
存储用户标识"sub": username令牌的主体(通常是用户名)

OAuth2PasswordBearer

告诉 FastAPI 从 Authorization: Bearer <token> 请求头中获取令牌:

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

tokenUrl="token" 指定客户端获取令牌的端点路径,会出现在 API 文档中。

SECRET_KEY 必须保密且足够复杂,生产环境应使用环境变量存储。如果密钥泄露,攻击者可以伪造任意用户的令牌。


使用 API 文档测试

配置 OAuth2 后,Swagger UI 会出现"Authorize"按钮:

  1. 点击 "Authorize"
  2. 输入用户名和密码(如 alice / secret
  3. 点击 "Authorize" 获取令牌
  4. 之后的所有请求都会自动携带令牌

小结

  • OAuth2 + JWT 是前后端分离应用最常用的认证方案
  • 密码必须哈希存储,不能明文保存
  • JWT 令牌包含用户标识和过期时间
  • 使用依赖注入(Depends)实现认证逻辑的复用
  • 生产环境中 SECRET_KEY 必须保密