Pandas 读取 SQL 数据库
Pandas 提供了一组直接与 SQL 数据库交互的函数,可以将查询结果直接读取为 DataFrame,也可以将 DataFrame 写回数据库。这使得数据分析师无需手动处理数据库连接和结果解析,大幅简化了数据库与 Python 的交互流程。
核心函数概览
| 函数 | 用途 | 返回类型 |
|---|---|---|
pd.read_sql() |
执行 SQL 查询或读取整张表(兼容两种场景的通用函数) | DataFrame |
pd.read_sql_query() |
执行 SQL 查询语句,适合复杂查询 | DataFrame |
pd.read_sql_table() |
直接读取整张表,仅支持 SQLAlchemy 连接 | DataFrame |
DataFrame.to_sql() |
将 DataFrame 写入数据库表 | None / int |
实际工作中推荐使用
pd.read_sql(),它会根据传入参数自动判断是执行查询还是读取整表,兼容性最好。pd.read_sql_table()仅支持 SQLAlchemy 引擎连接,不支持原生sqlite3等 DB-API 连接。
建立数据库连接
Pandas 本身不直接连接数据库,需要借助第三方库建立连接后传入。主流方式有两种:SQLAlchemy 引擎(推荐)和 DB-API 原生连接(轻量简单)。
方式一:SQLAlchemy(推荐)
SQLAlchemy 是 Python 最主流的数据库工具包,支持所有主流数据库,且与 Pandas 兼容性最好:
pip install sqlalchemy
实例
# SQLAlchemy 连接字符串格式:数据库类型+驱动://用户名:密码@主机:端口/数据库名
# SQLite(文件数据库,无需用户名密码)
engine = create_engine("sqlite:///mydata.db")
# MySQL
engine = create_engine("mysql+pymysql://root:password@localhost:3306/mydb")
# PostgreSQL
engine = create_engine("postgresql+psycopg2://user:password@localhost:5432/mydb")
# SQL Server
engine = create_engine("mssql+pyodbc://user:password@server/mydb?driver=ODBC+Driver+17+for+SQL+Server")
# 验证连接是否成功
with engine.connect() as conn:
print("连接成功")
方式二:sqlite3 原生连接(仅 SQLite)
Python 内置 sqlite3 模块,无需额外安装,适合轻量级本地 SQLite 数据库:
实例
# 连接到 SQLite 文件数据库(文件不存在时自动创建)
conn = sqlite3.connect("mydata.db")
# 连接到内存数据库(程序退出后数据消失,适合测试)
conn_memory = sqlite3.connect(":memory:")
# 使用完毕后需手动关闭连接
# conn.close()
pd.read_sql() 读取数据
基本语法
pd.read_sql(sql, con, index_col=None, coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)
主要参数说明:
| 参数 | 类型 | 说明 |
|---|---|---|
sql |
str | SQL 查询语句,或表名(与 con 类型有关) |
con |
连接对象 | SQLAlchemy 引擎或 DB-API 连接对象 |
index_col |
str 或 list | 将指定列设为 DataFrame 的行索引 |
params |
list 或 dict | SQL 参数化查询的参数值,防止 SQL 注入 |
parse_dates |
list 或 dict | 将指定列解析为 datetime 类型 |
chunksize |
int | 分块读取,每块返回指定行数的 DataFrame 迭代器 |
1、读取整张表
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
# 读取整张 employees 表
df = pd.read_sql("employees", con=engine)
print(df.head())
print(f"共 {len(df)} 行,{len(df.columns)} 列")
2、执行 SQL 查询
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
# 带条件的查询
df = pd.read_sql("SELECT * FROM employees WHERE department = 'IT'", con=engine)
# 多表关联查询
sql = """
SELECT e.name, e.salary, d.department_name
FROM employees e
JOIN departments d ON e.dept_id = d.id
WHERE e.salary > 10000
ORDER BY e.salary DESC
"""
df = pd.read_sql(sql, con=engine)
print(df.head(10))
3、参数化查询(防止 SQL 注入)
查询条件来自用户输入时,务必使用参数化查询,不要用字符串拼接 SQL:
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
# ❌ 危险写法:字符串拼接 SQL,存在 SQL 注入风险
dept = "IT"
# df = pd.read_sql(f"SELECT * FROM employees WHERE department = '{dept}'", engine)
# ✅ 安全写法:使用 ? 占位符(sqlite3)或 :name 命名参数(SQLAlchemy)
# SQLite / DB-API 风格(使用 ?)
df = pd.read_sql(
"SELECT * FROM employees WHERE department = ? AND salary > ?",
con=engine,
params=["IT", 8000] # params 是列表,按位置替换 ?
)
# SQLAlchemy 命名参数风格(使用 :param_name)
from sqlalchemy import text
with engine.connect() as conn:
df = pd.read_sql(
text("SELECT * FROM employees WHERE department = :dept AND salary > :min_salary"),
con=conn,
params={"dept": "IT", "min_salary": 8000}
)
print(df)
4、设置索引列
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
# 将数据库中的 id 列设为 DataFrame 的行索引
df = pd.read_sql("SELECT * FROM employees", con=engine, index_col="id")
print(df.head())
# 使用多列作为复合索引
df = pd.read_sql(
"SELECT * FROM orders",
con=engine,
index_col=["year", "month"] # 复合索引
)
5、解析日期列
数据库中存储的日期字段读取后默认是字符串类型,使用 parse_dates 参数可以直接解析为 datetime:
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
# 将 created_at 和 updated_at 列解析为 datetime 类型
df = pd.read_sql(
"SELECT * FROM orders",
con=engine,
parse_dates=["created_at", "updated_at"]
)
print(df.dtypes)
# created_at datetime64[ns]
# updated_at datetime64[ns]
# 也可以指定解析格式(针对非标准日期格式)
df = pd.read_sql(
"SELECT * FROM orders",
con=engine,
parse_dates={"created_at": "%Y%m%d"} # 将 "20240115" 这样的格式解析为日期
)
分块读取大数据(chunksize)
当数据库表数据量很大时,一次性全部读入内存会导致 OOM(内存溢出)。使用 chunksize 参数可以将数据分批读取,每次只处理一部分:
实例
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:password@localhost/bigdata")
# chunksize=10000 表示每次读取 10000 行,返回一个迭代器
chunks = pd.read_sql("SELECT * FROM large_table", con=engine, chunksize=10000)
# 逐块处理(每次只有 10000 行在内存中)
result_list = []
for i, chunk in enumerate(chunks):
# 对每块数据执行处理逻辑(如过滤、聚合等)
processed = chunk[chunk["status"] == "active"]
result_list.append(processed)
print(f"已处理第 {i+1} 块,当前块有效行数:{len(processed)}")
# 将所有处理后的块合并为一个 DataFrame
final_df = pd.concat(result_list, ignore_index=True)
print(f"最终共 {len(final_df)} 行有效数据")
pd.read_sql_query() 和 pd.read_sql_table()
pd.read_sql_query():只执行查询
与 pd.read_sql() 功能基本相同,但只接受 SQL 查询语句,不接受表名。参数完全一致,适合需要明确区分"查询"和"读表"操作的场景:
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
df = pd.read_sql_query(
"SELECT name, salary FROM employees WHERE salary > 5000",
con=engine
)
print(df)
pd.read_sql_table():读取整张表(仅 SQLAlchemy)
pd.read_sql_table() 专门用于读取整张表,支持通过参数过滤列和行,但仅支持 SQLAlchemy 引擎连接,不支持 sqlite3 等原生连接:
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
# 读取 employees 表的指定列
df = pd.read_sql_table(
"employees",
con=engine,
columns=["id", "name", "salary", "department"] # 只读取这几列
)
# 还支持指定 schema(数据库中的模式)
df = pd.read_sql_table(
"employees",
con=engine,
schema="hr" # 读取 hr 模式下的 employees 表
)
print(df.head())
将 DataFrame 写入数据库(to_sql)
使用 DataFrame.to_sql() 可以将 DataFrame 写入数据库,支持新建表、追加数据或覆盖原表:
基本语法
DataFrame.to_sql(name, con, schema=None, if_exists='fail', index=True, index_label=None, chunksize=None, dtype=None, method=None)
if_exists 参数决定目标表已存在时的行为:
| if_exists 值 | 行为 | 适用场景 |
|---|---|---|
'fail'(默认) |
表已存在时报错 | 防止意外覆盖已有数据 |
'replace' |
先删除原表,再重新建表写入 | 全量刷新数据 |
'append' |
向已有表追加数据,不改变表结构 | 增量写入新数据 |
实例
from sqlalchemy import create_engine
engine = create_engine("sqlite:///mydata.db")
# 准备示例数据
df = pd.DataFrame({
"name": ["张三", "李四", "王五"],
"department": ["IT", "HR", "IT"],
"salary": [12000, 8000, 15000]
})
# 将 DataFrame 写入 employees 表,如果已存在则替换
df.to_sql(
"employees",
con=engine,
if_exists="replace", # 覆盖原有数据
index=False # 不将 DataFrame 的行索引写入数据库(通常不需要)
)
# 向已有表追加新数据(注意:列名和数据类型必须匹配)
new_employees = pd.DataFrame({
"name": ["赵六"],
"department": ["Finance"],
"salary": [11000]
})
new_employees.to_sql("employees", con=engine, if_exists="append", index=False)
# 验证写入结果
result = pd.read_sql("SELECT * FROM employees", con=engine)
print(result)
指定列的数据类型
写入时可以通过 dtype 参数显式指定每列在数据库中的类型:
实例
from sqlalchemy import create_engine, Integer, String, Float, DateTime
engine = create_engine("sqlite:///mydata.db")
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"score": [92.5, 88.0, 95.3],
})
df.to_sql(
"students",
con=engine,
if_exists="replace",
index=False,
dtype={
"id": Integer(), # 整数类型
"name": String(50), # 变长字符串,最大 50 字符
"score": Float() # 浮点数
}
)
完整使用示例
下面是一个从 SQLite 数据库中读取销售数据并进行分析的完整示例:
实例
import sqlite3
from sqlalchemy import create_engine
# 第一步:准备测试数据库
engine = create_engine("sqlite:///sales.db")
conn = sqlite3.connect("sales.db")
# 创建示例数据并写入数据库
sales_data = pd.DataFrame({
"order_id": [1001, 1002, 1003, 1004, 1005, 1006],
"product": ["笔记本", "手机", "平板", "笔记本", "手机", "平板"],
"amount": [6999, 3999, 2999, 7299, 4299, 3199],
"quantity": [2, 5, 3, 1, 4, 2],
"order_date": ["2024-01-10", "2024-01-15", "2024-01-20",
"2024-02-05", "2024-02-10", "2024-02-18"]
})
sales_data.to_sql("sales", con=engine, if_exists="replace", index=False)
# 第二步:读取数据并解析日期
df = pd.read_sql(
"SELECT * FROM sales",
con=engine,
parse_dates=["order_date"]
)
print("原始数据:")
print(df)
print()
# 第三步:按产品统计销售额
sql_agg = """
SELECT product,
COUNT(*) AS 订单数,
SUM(amount * quantity) AS 总销售额,
AVG(amount) AS 平均单价
FROM sales
GROUP BY product
ORDER BY 总销售额 DESC
"""
summary = pd.read_sql(sql_agg, con=engine)
print("各产品销售汇总:")
print(summary)
print()
# 第四步:将统计结果写回数据库
summary.to_sql("sales_summary", con=engine, if_exists="replace", index=False)
print("汇总数据已写入 sales_summary 表")
conn.close()
以上代码执行结果为:
原始数据: order_id product amount quantity order_date 0 1001 笔记本 6999 2 2024-01-10 1 1002 手机 3999 5 2024-01-15 2 1003 平板 2999 3 2024-01-20 3 1004 笔记本 7299 1 2024-02-05 4 1005 手机 4299 4 2024-02-10 5 1006 平板 3199 2 2024-02-18 各产品销售汇总: product 订单数 总销售额 平均单价 0 手机 2 36991 4149.0 1 笔记本 2 21297 7149.0 2 平板 2 15395 3099.0 汇总数据已写入 sales_summary 表
常见问题与注意事项
1、连接用完后需要关闭
使用 DB-API 原生连接时(如 sqlite3),操作完成后需手动关闭连接。推荐用 with 语句自动管理:
实例
import pandas as pd
# 使用 with 语句,退出时自动关闭连接,即使发生异常也不会泄漏连接
with sqlite3.connect("mydata.db") as conn:
df = pd.read_sql("SELECT * FROM employees", con=conn)
print(df) # 连接已自动关闭,但 df 数据仍然可用
2、大表不要直接读取全部数据
对于几百万行以上的大表,直接 SELECT * 读取全量数据会耗尽内存。应优先在 SQL 层面过滤数据(WHERE、LIMIT),或使用 chunksize 分块读取。
3、read_sql_table 仅支持 SQLAlchemy
若使用 sqlite3 等 DB-API 原生连接调用 pd.read_sql_table(),会报 NotImplementedError。请改用 pd.read_sql() 或换用 SQLAlchemy 引擎。
4、to_sql 的 index 参数默认为 True
默认情况下 to_sql 会把 DataFrame 的行索引(0, 1, 2...)也写入数据库,形成一个名为 index 的列,通常这是多余的。建议明确传入 index=False。
5、数据库驱动需单独安装
SQLAlchemy 连接不同数据库时,还需安装对应的驱动包:
| 数据库 | 驱动包 | 安装命令 |
|---|---|---|
| MySQL | PyMySQL | pip install pymysql |
| PostgreSQL | psycopg2 | pip install psycopg2-binary |
| SQL Server | pyodbc | pip install pyodbc |
| Oracle | cx_Oracle | pip install cx_Oracle |
| SQLite | 内置 | 无需安装 |
