FastAPI 数据库集成
FastAPI 可以与各种数据库集成,最常用的方式是使用 SQLAlchemy 作为 ORM。本节介绍如何使用 FastAPI + SQLAlchemy 构建具有 CRUD(创建、读取、更新、删除)功能的 API。
安装依赖
pip install sqlalchemy
项目结构
project/ ├── main.py # FastAPI 应用入口 ├── database.py # 数据库连接配置 ├── models.py # SQLAlchemy 模型 ├── schemas.py # Pydantic 数据模型 └── crud.py # 数据库操作函数
1. 数据库配置
在 database.py 中配置数据库连接:
实例
# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# SQLite 数据库(适合开发)
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# 创建数据库引擎
# connect_args 仅 SQLite 需要,允许多线程访问
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
class Base(DeclarativeBase):
pass
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# SQLite 数据库(适合开发)
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"
# 创建数据库引擎
# connect_args 仅 SQLite 需要,允许多线程访问
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False}
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
class Base(DeclarativeBase):
pass
开发环境使用 SQLite 即可,它是一个文件数据库,无需安装额外的数据库服务。生产环境建议使用 PostgreSQL 或 MySQL,只需修改
SQLALCHEMY_DATABASE_URL即可。
2. SQLAlchemy 模型
在 models.py 中定义数据库表结构:
实例
# models.py
from sqlalchemy import Column, Integer, String
from database import Base
class User(Base):
__tablename__ = "users" # 表名
id = Column(Integer, primary_key=True, index=True) # 主键,自增
email = Column(String, unique=True, index=True) # 唯一,建索引
hashed_password = Column(String) # 哈希密码
is_active = Column(Boolean, default=True) # 是否活跃
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True) # 标题,建索引
description = Column(String) # 描述
owner_id = Column(Integer, ForeignKey("users.id")) # 外键关联 users 表
from sqlalchemy import Column, Integer, String
from database import Base
class User(Base):
__tablename__ = "users" # 表名
id = Column(Integer, primary_key=True, index=True) # 主键,自增
email = Column(String, unique=True, index=True) # 唯一,建索引
hashed_password = Column(String) # 哈希密码
is_active = Column(Boolean, default=True) # 是否活跃
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True) # 标题,建索引
description = Column(String) # 描述
owner_id = Column(Integer, ForeignKey("users.id")) # 外键关联 users 表
3. Pydantic 数据模型
在 schemas.py 中定义 API 的输入/输出数据结构:
实例
# schemas.py
from pydantic import BaseModel
# ===== 用户模型 =====
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str # 创建时需要密码
class UserOut(UserBase):
id: int
is_active: bool
model_config = ConfigDict(from_attributes=True) # 支持 ORM 对象
# ===== 商品模型 =====
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class ItemOut(ItemBase):
id: int
owner_id: int
model_config = ConfigDict(from_attributes=True)
from pydantic import BaseModel
# ===== 用户模型 =====
class UserBase(BaseModel):
email: str
class UserCreate(UserBase):
password: str # 创建时需要密码
class UserOut(UserBase):
id: int
is_active: bool
model_config = ConfigDict(from_attributes=True) # 支持 ORM 对象
# ===== 商品模型 =====
class ItemBase(BaseModel):
title: str
description: str | None = None
class ItemCreate(ItemBase):
pass
class ItemOut(ItemBase):
id: int
owner_id: int
model_config = ConfigDict(from_attributes=True)
Pydantic 模型与 SQLAlchemy 模型是分离的。Pydantic 负责 API 层的数据校验,SQLAlchemy 负责数据库层的表结构。
from_attributes=True让 Pydantic 能够从 SQLAlchemy 对象中读取数据。
4. 数据库操作函数
在 crud.py 中封装数据库操作:
实例
# crud.py
from sqlalchemy.orm import Session
import models, schemas
def get_user(db: Session, user_id: int):
"""根据 ID 获取用户"""
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
"""根据邮箱获取用户"""
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
"""获取用户列表"""
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
"""创建用户"""
fake_hashed_password = user.password + "notreallyhashed" # 实际应使用 passlib 哈希
db_user = models.User(
email=user.email,
hashed_password=fake_hashed_password
)
db.add(db_user)
db.commit() # 提交事务
db.refresh(db_user) # 刷新对象,获取数据库生成的 id
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
"""获取商品列表"""
return db.query(models.Item).offset(skip).limit(limit).all()
def create_item(db: Session, item: schemas.ItemCreate, user_id: int):
"""创建商品"""
db_item = models.Item(**item.model_dump(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
from sqlalchemy.orm import Session
import models, schemas
def get_user(db: Session, user_id: int):
"""根据 ID 获取用户"""
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
"""根据邮箱获取用户"""
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, skip: int = 0, limit: int = 100):
"""获取用户列表"""
return db.query(models.User).offset(skip).limit(limit).all()
def create_user(db: Session, user: schemas.UserCreate):
"""创建用户"""
fake_hashed_password = user.password + "notreallyhashed" # 实际应使用 passlib 哈希
db_user = models.User(
email=user.email,
hashed_password=fake_hashed_password
)
db.add(db_user)
db.commit() # 提交事务
db.refresh(db_user) # 刷新对象,获取数据库生成的 id
return db_user
def get_items(db: Session, skip: int = 0, limit: int = 100):
"""获取商品列表"""
return db.query(models.Item).offset(skip).limit(limit).all()
def create_item(db: Session, item: schemas.ItemCreate, user_id: int):
"""创建商品"""
db_item = models.Item(**item.model_dump(), owner_id=user_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
5. FastAPI 应用入口
在 main.py 中整合所有组件:
实例
# main.py
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from pydantic import ConfigDict
import models, schemas, crud
from database import engine, SessionLocal, Base
# 创建数据库表
Base.metadata.create_all(bind=engine)
app = FastAPI()
# 依赖:获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ===== 用户路由 =====
@app.post("/users/", response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# 检查邮箱是否已存在
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="邮箱已注册")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.UserOut])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.UserOut)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="用户不存在")
return db_user
# ===== 商品路由 =====
@app.post("/users/{user_id}/items/", response_model=schemas.ItemOut)
def create_item_for_user(
user_id: int,
item: schemas.ItemCreate,
db: Session = Depends(get_db),
):
return crud.create_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.ItemOut])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from pydantic import ConfigDict
import models, schemas, crud
from database import engine, SessionLocal, Base
# 创建数据库表
Base.metadata.create_all(bind=engine)
app = FastAPI()
# 依赖:获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ===== 用户路由 =====
@app.post("/users/", response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
# 检查邮箱是否已存在
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=400, detail="邮箱已注册")
return crud.create_user(db=db, user=user)
@app.get("/users/", response_model=list[schemas.UserOut])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users
@app.get("/users/{user_id}", response_model=schemas.UserOut)
def read_user(user_id: int, db: Session = Depends(get_db)):
db_user = crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="用户不存在")
return db_user
# ===== 商品路由 =====
@app.post("/users/{user_id}/items/", response_model=schemas.ItemOut)
def create_item_for_user(
user_id: int,
item: schemas.ItemCreate,
db: Session = Depends(get_db),
):
return crud.create_item(db=db, item=item, user_id=user_id)
@app.get("/items/", response_model=list[schemas.ItemOut])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
items = crud.get_items(db, skip=skip, limit=limit)
return items
数据库会话的依赖注入
核心模式是使用依赖注入管理数据库会话:
def get_db():
db = SessionLocal() # 创建会话
try:
yield db # 提供会话给路由函数
finally:
db.close() # 请求完成后关闭会话
每个请求获取独立的数据库会话,请求完成后自动关闭,避免资源泄露。
不同数据库的连接字符串
| 数据库 | 连接字符串示例 |
|---|---|
| SQLite | sqlite:///./sql_app.db |
| PostgreSQL | postgresql://user:password@localhost/dbname |
| MySQL | mysql+pymysql://user:password@localhost/dbname |
切换数据库只需修改连接字符串,其余代码无需改动。这也是使用 ORM 的优势之一。
小结
- 使用 SQLAlchemy 作为 ORM,FastAPI 通过依赖注入管理数据库会话
- Pydantic 模型(
schemas.py)负责 API 数据校验,SQLAlchemy 模型(models.py)负责数据库表结构 - CRUD 操作封装在
crud.py中,保持代码清晰 - 开发使用 SQLite,生产切换到 PostgreSQL/MySQL 只需改连接字符串
from_attributes=True让 Pydantic 能从 ORM 对象读取数据
