用户系统 — 注册、登录、JWT 认证
本章你将学会 FastAPI 的 JWT 认证方案,理解它和 Django/Flask Session 认证的区别。
JWT vs Session 认证
Django 和 Flask 用的是 Session 认证:服务器存储用户状态,Cookie 中只存 Session ID。
FastAPI 推荐 JWT 认证:用户登录后获得一个 Token(令牌),后续请求携带 Token 即可认证。
| 特性 | Session(Django/Flask) | JWT(FastAPI) |
|---|---|---|
| 存储位置 | 服务器端(内存/数据库) | 客户端(Cookie/LocalStorage) |
| 扩展性 | 服务器需共享 Session | 无状态,天然支持分布式 |
| 适用场景 | 服务端渲染的网站 | API + SPA / 移动端 |
| 过期控制 | 服务端可随时失效 | Token 有效期内一直有效 |
安装依赖
(venv) $ pip install passlib python-jose python-multipart
- passlib:密码哈希(对标 werkzeug.security)
- python-jose:JWT 的生成和验证
- python-multipart:处理表单提交(OAuth2 登录表单)
定义 User 模型
实例
# 文件路径:models.py 新增
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(120), unique=True, nullable=False)
hashed_password = Column(String(256), nullable=False)
def set_password(self, password: str):
"""哈希密码"""
self.hashed_password = pwd_context.hash(password)
def verify_password(self, password: str) -> bool:
"""校验密码"""
return pwd_context.verify(password, self.hashed_password)
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(120), unique=True, nullable=False)
hashed_password = Column(String(256), nullable=False)
def set_password(self, password: str):
"""哈希密码"""
self.hashed_password = pwd_context.hash(password)
def verify_password(self, password: str) -> bool:
"""校验密码"""
return pwd_context.verify(password, self.hashed_password)
生成并执行迁移:
(venv) $ alembic revision --autogenerate -m "新增 User 模型" (venv) $ alembic upgrade head
JWT 工具函数
实例
# 文件路径:auth.py(新建文件)
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
# 密钥(生产环境从环境变量读取)
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # Token 有效期:24 小时
# OAuth2PasswordBearer 告诉 FastAPI:从请求头 Authorization: Bearer <token> 中提取 JWT
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def create_access_token(data: dict) -> str:
"""生成 JWT Token"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict | None:
"""验证并解码 JWT Token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
# 密钥(生产环境从环境变量读取)
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 # Token 有效期:24 小时
# OAuth2PasswordBearer 告诉 FastAPI:从请求头 Authorization: Bearer <token> 中提取 JWT
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def create_access_token(data: dict) -> str:
"""生成 JWT Token"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict | None:
"""验证并解码 JWT Token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
注册路由
实例
# 文件路径:routers/users.py
from fastapi import APIRouter, Depends, HTTPException, Request, Form
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from database import get_db
from models import User
from schemas import UserCreate, UserResponse
from auth import create_access_token
router = APIRouter(prefix="/users", tags=["用户"])
templates = Jinja2Templates(directory="templates")
@router.post("/register", response_model=UserResponse)
def register(
username: str = Form(...),
email: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db)
):
"""用户注册"""
# 检查用户名和邮箱是否已存在
if db.query(User).filter(User.username == username).first():
raise HTTPException(status_code=400, detail="用户名已被使用")
if db.query(User).filter(User.email == email).first():
raise HTTPException(status_code=400, detail="邮箱已被注册")
user = User(username=username, email=email)
user.set_password(password)
db.add(user)
db.commit()
db.refresh(user)
return user
from fastapi import APIRouter, Depends, HTTPException, Request, Form
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from database import get_db
from models import User
from schemas import UserCreate, UserResponse
from auth import create_access_token
router = APIRouter(prefix="/users", tags=["用户"])
templates = Jinja2Templates(directory="templates")
@router.post("/register", response_model=UserResponse)
def register(
username: str = Form(...),
email: str = Form(...),
password: str = Form(...),
db: Session = Depends(get_db)
):
"""用户注册"""
# 检查用户名和邮箱是否已存在
if db.query(User).filter(User.username == username).first():
raise HTTPException(status_code=400, detail="用户名已被使用")
if db.query(User).filter(User.email == email).first():
raise HTTPException(status_code=400, detail="邮箱已被注册")
user = User(username=username, email=email)
user.set_password(password)
db.add(user)
db.commit()
db.refresh(user)
return user
登录路由(签发 JWT)
实例
# 文件路径:routers/users.py 追加
from fastapi.security import OAuth2PasswordRequestForm
from auth import create_access_token
@router.post("/token")
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""
登录并获取 Token
使用 OAuth2PasswordRequestForm:字段名为 username 和 password(表单格式)
"""
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not user.verify_password(form_data.password):
raise HTTPException(
status_code=401,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"}
)
# 签发 JWT:只包含 user_id(不能放敏感信息)
access_token = create_access_token(data={"sub": str(user.id)})
return {"access_token": access_token, "token_type": "bearer"}
from fastapi.security import OAuth2PasswordRequestForm
from auth import create_access_token
@router.post("/token")
def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""
登录并获取 Token
使用 OAuth2PasswordRequestForm:字段名为 username 和 password(表单格式)
"""
user = db.query(User).filter(User.username == form_data.username).first()
if not user or not user.verify_password(form_data.password):
raise HTTPException(
status_code=401,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"}
)
# 签发 JWT:只包含 user_id(不能放敏感信息)
access_token = create_access_token(data={"sub": str(user.id)})
return {"access_token": access_token, "token_type": "bearer"}
JWT Token 中
sub是标准的 subject 字段,存储用户标识。Token 本身不加密(只是 Base64 编码),所以绝对不能在里面放密码等敏感信息。如果你想加密,用 JWE 而非 JWT。
在路由中使用 JWT 保护
实例
# 获取当前用户的依赖函数
from auth import oauth2_scheme, decode_access_token
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
"""从 JWT Token 中解析当前用户"""
payload = decode_access_token(token)
if payload is None:
raise HTTPException(status_code=401, detail="Token 无效或已过期")
user_id = payload.get("sub")
user = db.query(User).filter(User.id == int(user_id)).first()
if user is None:
raise HTTPException(status_code=401, detail="用户不存在")
return user
# 在需要登录的路由中注入
@router.get("/me", response_model=UserResponse)
def read_current_user(current_user: User = Depends(get_current_user)):
"""获取当前用户信息(需要登录)"""
return current_user
from auth import oauth2_scheme, decode_access_token
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
):
"""从 JWT Token 中解析当前用户"""
payload = decode_access_token(token)
if payload is None:
raise HTTPException(status_code=401, detail="Token 无效或已过期")
user_id = payload.get("sub")
user = db.query(User).filter(User.id == int(user_id)).first()
if user is None:
raise HTTPException(status_code=401, detail="用户不存在")
return user
# 在需要登录的路由中注入
@router.get("/me", response_model=UserResponse)
def read_current_user(current_user: User = Depends(get_current_user)):
"""获取当前用户信息(需要登录)"""
return current_user
对于 SSR 页面,JWT 通常存到 Cookie 中(而非 Authorization Header),在模板路由中从 Cookie 提取 Token 验证。
本章小结
本章你实现了 FastAPI 的 JWT 认证:passlib 哈希密码、python-jose 签发验证 JWT、OAuth2PasswordBearer 提取 Token、Depends 实现认证依赖注入。
与 Django/Flask 的 Session 认证不同,JWT 是无状态方案,更适合前后端分离场景。
