FastAPI 安全认证
FastAPI 内置了多种安全工具,支持 OAuth2、JWT 令牌、API Key 等常见的认证和授权方式。本节介绍如何实现基于 OAuth2 + JWT 的用户认证。
安全认证概述
FastAPI 支持的安全方案:
| 方案 | 适用场景 | 说明 |
|---|---|---|
| HTTP Basic Auth | 简单内部服务 | 用户名密码编码在请求头中,安全性较低 |
| API Key | 服务间调用 | 通过请求头、查询参数或 Cookie 传递密钥 |
| OAuth2 + JWT | 前后端分离应用 | 最常用的方案,安全且灵活 |
OAuth2 密码模式 + JWT
这是前后端分离应用中最常用的认证方案。流程:
- 客户端发送用户名和密码到
/token端点 - 服务器验证凭据,返回 JWT 访问令牌
- 客户端在后续请求中携带令牌(
Authorization: Bearer <token>) - 服务器验证令牌,识别用户身份
1. 安装依赖
pip install "python-jose[cryptography]" passlib[bcrypt]
2. 完整示例
实例
from datetime import datetime, timedelta
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# ===== 配置 =====
SECRET_KEY = "your-secret-key-keep-it-secret" # 生产环境使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# ===== 密码哈希 =====
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ===== OAuth2 方案 =====
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ===== 数据模型 =====
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
# ===== 模拟数据库 =====
fake_users_db = {
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": pwd_context.hash("secret"), # 密码: secret
"disabled": False,
}
}
# ===== 工具函数 =====
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def get_user(db: dict, username: str) -> UserInDB | None:
"""从数据库获取用户"""
if username in db:
return UserInDB(**db[username])
return None
def authenticate_user(db: dict, username: str, password: str):
"""验证用户凭据"""
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
"""创建 JWT 访问令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
"""从令牌中获取当前用户(依赖函数)"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
"""获取当前活跃用户"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户已被禁用")
return current_user
# ===== 路由 =====
app = FastAPI()
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
"""登录获取令牌"""
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""获取当前用户信息(需要认证)"""
return current_user
@app.get("/users/me/items")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""获取当前用户的条目(需要认证)"""
return [{"item_id": "Foo", "owner": current_user.username}]
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# ===== 配置 =====
SECRET_KEY = "your-secret-key-keep-it-secret" # 生产环境使用环境变量
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# ===== 密码哈希 =====
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ===== OAuth2 方案 =====
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ===== 数据模型 =====
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
class UserInDB(User):
hashed_password: str
# ===== 模拟数据库 =====
fake_users_db = {
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": pwd_context.hash("secret"), # 密码: secret
"disabled": False,
}
}
# ===== 工具函数 =====
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""生成密码哈希"""
return pwd_context.hash(password)
def get_user(db: dict, username: str) -> UserInDB | None:
"""从数据库获取用户"""
if username in db:
return UserInDB(**db[username])
return None
def authenticate_user(db: dict, username: str, password: str):
"""验证用户凭据"""
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
"""创建 JWT 访问令牌"""
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
"""从令牌中获取当前用户(依赖函数)"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)],
):
"""获取当前活跃用户"""
if current_user.disabled:
raise HTTPException(status_code=400, detail="用户已被禁用")
return current_user
# ===== 路由 =====
app = FastAPI()
@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
"""登录获取令牌"""
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""获取当前用户信息(需要认证)"""
return current_user
@app.get("/users/me/items")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)],
):
"""获取当前用户的条目(需要认证)"""
return [{"item_id": "Foo", "owner": current_user.username}]
代码解析
密码哈希
使用 passlib 的 bcrypt 算法对密码进行哈希处理,确保数据库中不存储明文密码:
| 函数 | 说明 |
|---|---|
pwd_context.hash(password) | 将明文密码转为哈希值 |
pwd_context.verify(plain, hashed) | 验证明文密码是否匹配哈希值 |
JWT 令牌
JWT(JSON Web Token)是一种安全的令牌格式,包含用户信息和过期时间:
| 操作 | 函数 | 说明 |
|---|---|---|
| 创建令牌 | jwt.encode() | 将数据编码为 JWT 字符串 |
| 解析令牌 | jwt.decode() | 解析并验证 JWT 字符串 |
| 设置过期 | "exp": expire | 令牌的过期时间 |
| 存储用户标识 | "sub": username | 令牌的主体(通常是用户名) |
OAuth2PasswordBearer
告诉 FastAPI 从 Authorization: Bearer <token> 请求头中获取令牌:
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
tokenUrl="token" 指定客户端获取令牌的端点路径,会出现在 API 文档中。
SECRET_KEY必须保密且足够复杂,生产环境应使用环境变量存储。如果密钥泄露,攻击者可以伪造任意用户的令牌。
使用 API 文档测试
配置 OAuth2 后,Swagger UI 会出现"Authorize"按钮:
- 点击 "Authorize"
- 输入用户名和密码(如
alice/secret) - 点击 "Authorize" 获取令牌
- 之后的所有请求都会自动携带令牌
小结
- OAuth2 + JWT 是前后端分离应用最常用的认证方案
- 密码必须哈希存储,不能明文保存
- JWT 令牌包含用户标识和过期时间
- 使用依赖注入(
Depends)实现认证逻辑的复用 - 生产环境中 SECRET_KEY 必须保密
