现在位置: 首页 > FastAPI 教程 > 正文

FastAPI 数据库集成

FastAPI 可以与各种数据库集成,最常用的方式是使用 SQLAlchemy 作为 ORM。本节介绍如何使用 FastAPI + SQLAlchemy 构建具有 CRUD(创建、读取、更新、删除)功能的 API。


安装依赖

pip install sqlalchemy

项目结构

project/
├── main.py          # FastAPI 应用入口
├── database.py      # 数据库连接配置
├── models.py        # SQLAlchemy 模型
├── schemas.py       # Pydantic 数据模型
└── crud.py          # 数据库操作函数

1. 数据库配置

database.py 中配置数据库连接:

实例

# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

# SQLite 数据库(适合开发)
SQLALCHEMY_DATABASE_URL = "sqlite:///./sql_app.db"

# 创建数据库引擎
# connect_args 仅 SQLite 需要,允许多线程访问
engine = create_engine(
    SQLALCHEMY_DATABASE_URL,
    connect_args={"check_same_thread": False}
)

# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


# 声明基类
class Base(DeclarativeBase):
    pass

开发环境使用 SQLite 即可,它是一个文件数据库,无需安装额外的数据库服务。生产环境建议使用 PostgreSQL 或 MySQL,只需修改 SQLALCHEMY_DATABASE_URL 即可。


2. SQLAlchemy 模型

models.py 中定义数据库表结构:

实例

# models.py
from sqlalchemy import Column, Integer, String
from database import Base


class User(Base):
    __tablename__ = "users"  # 表名

    id = Column(Integer, primary_key=True, index=True)  # 主键,自增
    email = Column(String, unique=True, index=True)      # 唯一,建索引
    hashed_password = Column(String)                      # 哈希密码
    is_active = Column(Boolean, default=True)             # 是否活跃


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, index=True)                   # 标题,建索引
    description = Column(String)                          # 描述
    owner_id = Column(Integer, ForeignKey("users.id"))   # 外键关联 users 表

3. Pydantic 数据模型

schemas.py 中定义 API 的输入/输出数据结构:

实例

# schemas.py
from pydantic import BaseModel


# ===== 用户模型 =====
class UserBase(BaseModel):
    email: str


class UserCreate(UserBase):
    password: str          # 创建时需要密码


class UserOut(UserBase):
    id: int
    is_active: bool

    model_config = ConfigDict(from_attributes=True)  # 支持 ORM 对象


# ===== 商品模型 =====
class ItemBase(BaseModel):
    title: str
    description: str | None = None


class ItemCreate(ItemBase):
    pass


class ItemOut(ItemBase):
    id: int
    owner_id: int

    model_config = ConfigDict(from_attributes=True)

Pydantic 模型与 SQLAlchemy 模型是分离的。Pydantic 负责 API 层的数据校验,SQLAlchemy 负责数据库层的表结构。from_attributes=True 让 Pydantic 能够从 SQLAlchemy 对象中读取数据。


4. 数据库操作函数

crud.py 中封装数据库操作:

实例

# crud.py
from sqlalchemy.orm import Session
import models, schemas


def get_user(db: Session, user_id: int):
    """根据 ID 获取用户"""
    return db.query(models.User).filter(models.User.id == user_id).first()


def get_user_by_email(db: Session, email: str):
    """根据邮箱获取用户"""
    return db.query(models.User).filter(models.User.email == email).first()


def get_users(db: Session, skip: int = 0, limit: int = 100):
    """获取用户列表"""
    return db.query(models.User).offset(skip).limit(limit).all()


def create_user(db: Session, user: schemas.UserCreate):
    """创建用户"""
    fake_hashed_password = user.password + "notreallyhashed"  # 实际应使用 passlib 哈希
    db_user = models.User(
        email=user.email,
        hashed_password=fake_hashed_password
    )
    db.add(db_user)
    db.commit()        # 提交事务
    db.refresh(db_user)  # 刷新对象,获取数据库生成的 id
    return db_user


def get_items(db: Session, skip: int = 0, limit: int = 100):
    """获取商品列表"""
    return db.query(models.Item).offset(skip).limit(limit).all()


def create_item(db: Session, item: schemas.ItemCreate, user_id: int):
    """创建商品"""
    db_item = models.Item(**item.model_dump(), owner_id=user_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

5. FastAPI 应用入口

main.py 中整合所有组件:

实例

# main.py
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException
from sqlalchemy.orm import Session
from pydantic import ConfigDict

import models, schemas, crud
from database import engine, SessionLocal, Base

# 创建数据库表
Base.metadata.create_all(bind=engine)

app = FastAPI()


# 依赖:获取数据库会话
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# ===== 用户路由 =====
@app.post("/users/", response_model=schemas.UserOut)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    # 检查邮箱是否已存在
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(status_code=400, detail="邮箱已注册")
    return crud.create_user(db=db, user=user)


@app.get("/users/", response_model=list[schemas.UserOut])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = crud.get_users(db, skip=skip, limit=limit)
    return users


@app.get("/users/{user_id}", response_model=schemas.UserOut)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = crud.get_user(db, user_id=user_id)
    if db_user is None:
        raise HTTPException(status_code=404, detail="用户不存在")
    return db_user


# ===== 商品路由 =====
@app.post("/users/{user_id}/items/", response_model=schemas.ItemOut)
def create_item_for_user(
    user_id: int,
    item: schemas.ItemCreate,
    db: Session = Depends(get_db),
):
    return crud.create_item(db=db, item=item, user_id=user_id)


@app.get("/items/", response_model=list[schemas.ItemOut])
def read_items(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    items = crud.get_items(db, skip=skip, limit=limit)
    return items

数据库会话的依赖注入

核心模式是使用依赖注入管理数据库会话:

def get_db():
    db = SessionLocal()   # 创建会话
    try:
        yield db          # 提供会话给路由函数
    finally:
        db.close()        # 请求完成后关闭会话

每个请求获取独立的数据库会话,请求完成后自动关闭,避免资源泄露。


不同数据库的连接字符串

数据库连接字符串示例
SQLitesqlite:///./sql_app.db
PostgreSQLpostgresql://user:password@localhost/dbname
MySQLmysql+pymysql://user:password@localhost/dbname

切换数据库只需修改连接字符串,其余代码无需改动。这也是使用 ORM 的优势之一。


小结

  • 使用 SQLAlchemy 作为 ORM,FastAPI 通过依赖注入管理数据库会话
  • Pydantic 模型(schemas.py)负责 API 数据校验,SQLAlchemy 模型(models.py)负责数据库表结构
  • CRUD 操作封装在 crud.py 中,保持代码清晰
  • 开发使用 SQLite,生产切换到 PostgreSQL/MySQL 只需改连接字符串
  • from_attributes=True 让 Pydantic 能从 ORM 对象读取数据