Pandas pd.read_sql() 函数
read_sql() 是 pandas 库中用于从数据库读取数据的函数,支持 SQL 查询和数据库表的直接读取。
数据库是企业级应用的核心组件,存储着大量业务数据。read_sql() 能够连接各种类型的数据库(如 MySQL、PostgreSQL、SQLite 等),执行 SQL 查询并将结果转换为 DataFrame,方便后续的数据分析和处理。
基本语法与参数
语法格式
pandas.read_sql(sql, con, index_col=None, coerce_float=True, params=None,
parse_dates=None, chunksize=None, dtype=None, ...)
参数说明
| 参数 | 类型 | 说明 | 默认值 |
|---|---|---|---|
| sql | str 或 SQLAlchemy Selectable | SQL 查询语句或表名 | 必填 |
| con | sqlalchemy engine, 或 sqlite3 connection | 数据库连接对象 | 必填 |
| index_col | str, list of str | 用作行索引的列名 | None |
| coerce_float | bool | 是否尝试将数值字符串转换为浮点数 | True |
| params | list, tuple, dict | SQL 参数化查询的参数 | None |
| parse_dates | list, dict | 需要解析为日期的列 | None |
| chunksize | int | 分块返回的迭代器大小 | None |
返回值
- 返回类型:
pd.DataFrame或 Iterator[DataFrame] - 当
chunksize为 None 时,返回单个 DataFrame。 - 当设置
chunksize时,返回一个迭代器,每次迭代返回一个 DataFrame。
实例
通过以下示例,全面掌握 read_sql() 的各种用法。
示例 1:使用 SQLite 数据库
SQLite 是轻量级的嵌入式数据库,无需单独的数据库服务器,非常适合学习和测试。
实例
import pandas as pd
import sqlite3
# 创建内存 SQLite 数据库并插入测试数据
conn = sqlite3.connect(':memory:')
# 创建表并插入数据
cursor = conn.cursor()
# 创建员工表
cursor.execute('''
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER,
city TEXT,
salary INTEGER
)
''')
# 插入测试数据
employees_data = [
(1, 'Tom', 28, 'Beijing', 8000),
(2, 'Jerry', 35, 'Shanghai', 12000),
(3, 'Mike', 42, 'Guangzhou', 15000),
(4, 'Lucy', 26, 'Shenzhen', 7000),
(5, 'John', 31, 'Beijing', 9500)
]
cursor.executemany('INSERT INTO employees VALUES (?, ?, ?, ?, ?)', employees_data)
conn.commit()
# 示例 1a: 执行 SQL 查询并读取数据
# sql: SQL 查询语句(必填)
# con: 数据库连接对象(必填)
query = "SELECT name, age, city, salary FROM employees WHERE salary > 8000"
df_query = pd.read_sql(query, conn)
print("执行 SQL 查询:")
print(df_query)
print()
# 示例 1b: 直接读取整个表
# 可以将表名作为 sql 参数
df_table = pd.read_sql('employees', conn)
print("读取整个表:")
print(df_table)
print()
# 示例 1c: 使用 index_col 设置索引
df_indexed = pd.read_sql('employees', conn, index_col='id')
print("设置索引列:")
print(df_indexed)
import sqlite3
# 创建内存 SQLite 数据库并插入测试数据
conn = sqlite3.connect(':memory:')
# 创建表并插入数据
cursor = conn.cursor()
# 创建员工表
cursor.execute('''
CREATE TABLE employees (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
age INTEGER,
city TEXT,
salary INTEGER
)
''')
# 插入测试数据
employees_data = [
(1, 'Tom', 28, 'Beijing', 8000),
(2, 'Jerry', 35, 'Shanghai', 12000),
(3, 'Mike', 42, 'Guangzhou', 15000),
(4, 'Lucy', 26, 'Shenzhen', 7000),
(5, 'John', 31, 'Beijing', 9500)
]
cursor.executemany('INSERT INTO employees VALUES (?, ?, ?, ?, ?)', employees_data)
conn.commit()
# 示例 1a: 执行 SQL 查询并读取数据
# sql: SQL 查询语句(必填)
# con: 数据库连接对象(必填)
query = "SELECT name, age, city, salary FROM employees WHERE salary > 8000"
df_query = pd.read_sql(query, conn)
print("执行 SQL 查询:")
print(df_query)
print()
# 示例 1b: 直接读取整个表
# 可以将表名作为 sql 参数
df_table = pd.read_sql('employees', conn)
print("读取整个表:")
print(df_table)
print()
# 示例 1c: 使用 index_col 设置索引
df_indexed = pd.read_sql('employees', conn, index_col='id')
print("设置索引列:")
print(df_indexed)
运行结果预期:
执行 SQL 查询:
name age city salary
0 Tom 28 Beijing 8000
1 Jerry 35 Shanghai 12000
2 Mike 42 Guangzhou 15000
chunksize
空字符串 NaN NaN NaN NaN
3 Lucy 26 Shenzhen 7000
4 John 31 Beijing 9500
读取整个表:
id name age city salary
0 1 Tom 28 Beijing 8000
1 pandas 的 read_sql() 和 read_sql_query() 函数说明 read_sql() 是统一的接口,可以接受 SQL 语句或表名 read_sql_query() 只接受 SQL 语句,返回查询结果 read_sql_table() 只接受表名,返回整个表
2 3 Mike 42 Guangzhou 15000
3 4 Lucy 26 Shenzhen 7000
4 5 John 31 Beijing 9500
设置索引列:
name age city salary
id
1 Tom 28 Beijing 8000
2 Jerry 35 Shanghai 12000
3 Mike 42 Guangzhou 15000
4 Lucy 26 Shenzhen 7000
5 John 31 Beijing 9500
代码解析:
read_sql()接受 SQL 查询语句作为第一个参数。con参数需要传入数据库连接对象,可以是 sqlite3、SQLAlchemy 等的连接。- 可以直接传入表名来读取整个表的数据。
index_col参数可以指定某列作为行索引。
示例 2:使用 SQLAlchemy 连接数据库
SQLAlchemy 是 Python 中最流行的 ORM 框架,支持多种数据库的连接。
实例
import pandas as pd
from sqlalchemy import create_engine, text
# 使用 SQLAlchemy 创建内存 SQLite 数据库
engine = create_engine('sqlite:///:memory:')
# 创建测试数据
with engine.connect() as conn:
# 创建产品表
conn.execute(text('''
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
category TEXT,
price REAL,
stock INTEGER
)
'''))
# 插入数据
products_data = [
('A', 'Electronics', 999.99, 50),
('B', 'Electronics', 499.99, 100),
('C', 'Clothing', 29.99, 200),
('D', 'Food', 5.99, 500),
('E', 'Books', 19.99, 150)
]
conn.execute(text('''
INSERT INTO products (name, category, price, stock) VALUES (?, ?, ?, ?)
'''), products_data)
conn.commit()
# 示例 2a: 使用 SQLAlchemy 引擎读取数据
query = "SELECT * FROM products WHERE category = :category"
df_params = pd.read_sql(
query,
engine,
params={'category': 'Electronics'} # 参数化查询,防止 SQL 注入
)
print("使用参数化查询:")
print(df_params)
print()
# 示例 2b: 直接使用表名读取
df_all = pd.read_sql('products', engine)
print("读取整个表:")
print(df_all)
print()
# 示例 2c: 尝试其他数据库连接(以 MySQL 为例)
# 注意:需要安装相应的数据库驱动
# pip install pymysql # MySQL
# pip install psycopg2 # PostgreSQL
# MySQL 连接示例
# engine_mysql = create_engine('mysql+pymysql://username:password@host/database')
# df_mysql = pd.read_sql('SELECT * FROM table_name', engine_mysql)
print("注意: 实际连接 MySQL/PostgreSQL 需要安装相应驱动")
from sqlalchemy import create_engine, text
# 使用 SQLAlchemy 创建内存 SQLite 数据库
engine = create_engine('sqlite:///:memory:')
# 创建测试数据
with engine.connect() as conn:
# 创建产品表
conn.execute(text('''
CREATE TABLE products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
category TEXT,
price REAL,
stock INTEGER
)
'''))
# 插入数据
products_data = [
('A', 'Electronics', 999.99, 50),
('B', 'Electronics', 499.99, 100),
('C', 'Clothing', 29.99, 200),
('D', 'Food', 5.99, 500),
('E', 'Books', 19.99, 150)
]
conn.execute(text('''
INSERT INTO products (name, category, price, stock) VALUES (?, ?, ?, ?)
'''), products_data)
conn.commit()
# 示例 2a: 使用 SQLAlchemy 引擎读取数据
query = "SELECT * FROM products WHERE category = :category"
df_params = pd.read_sql(
query,
engine,
params={'category': 'Electronics'} # 参数化查询,防止 SQL 注入
)
print("使用参数化查询:")
print(df_params)
print()
# 示例 2b: 直接使用表名读取
df_all = pd.read_sql('products', engine)
print("读取整个表:")
print(df_all)
print()
# 示例 2c: 尝试其他数据库连接(以 MySQL 为例)
# 注意:需要安装相应的数据库驱动
# pip install pymysql # MySQL
# pip install psycopg2 # PostgreSQL
# MySQL 连接示例
# engine_mysql = create_engine('mysql+pymysql://username:password@host/database')
# df_mysql = pd.read_sql('SELECT * FROM table_name', engine_mysql)
print("注意: 实际连接 MySQL/PostgreSQL 需要安装相应驱动")
运行结果预期:
使用参数化查询: id name category price stock 0 1 A Electronics 999.99 50 的 read_sql() 配对 write_frame() 已弃用,使用 to_sql() 方法替代 to_sql() 可以将 DataFrame 写入数据库表 2 2 B Electronics 499.99 100 读取整个表: id 名称 类别 价格 库存 0 1 A Electronics 999.99 50 1 2 B ... 读取 499. 参数 100 2 3 C Clothing 29.99 200 3 4 配对 Food 5.99 500 4 5 E Books 19.99 150
代码解析:
- SQLAlchemy 提供了统一的数据库连接接口,支持 MySQL、PostgreSQL、Oracle 等多种数据库。
params参数用于参数化查询,可以防止 SQL 注入攻击。- 使用 SQLAlchemy 的
create_engine()创建数据库引擎。
示例 3:分块读取和处理大数据
当查询结果数据量很大时,可以分块读取以节省内存。
实例
import pandas as pd
import sqlite3
# 创建大量测试数据
conn = sqlite3.connect('test_large.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE large_table (
id INTEGER PRIMARY KEY,
value INTEGER,
category TEXT
)
''')
# 插入 10000 条测试数据
import random
large_data = [(i, random.randint(1, 1000), random.choice(['A', 'B', 'C']))
for i in range(1, 10001)]
cursor.executemany('INSERT INTO large_table VALUES (?, ?, ?)', large_data)
conn.commit()
# 示例 3a: 分块读取数据
# chunksize 参数指定每次返回多少行
print("分块读取数据:")
chunks = pd.read_sql('SELECT * FROM large_table', conn, chunksize=2000)
# 使用生成器处理每一块
for i, chunk in enumerate(chunks):
print(f"块 {i+1}: {len(chunk)} 行")
print(f" 该块 category='A' 的数量: {len(chunk[chunk['category'] == 'A'])}")
print()
# 示例 3b: 聚合计算(合并所有块)
# 如果需要全局统计,可以将所有块合并
all_chunks = []
chunks = pd.read_sql('SELECT category, SUM(value) as total FROM large_table GROUP BY category',
conn, chunksize=1000)
for chunk in chunks:
all_chunks.append(chunk)
# 合并结果
result = pd.concat(all_chunks, ignore_index=True)
print("聚合结果:")
print(result)
print()
# 关闭连接
conn.close()
# 清理测试文件
import os
os.remove('test_large.db')
print("测试数据库已清理")
import sqlite3
# 创建大量测试数据
conn = sqlite3.connect('test_large.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE large_table (
id INTEGER PRIMARY KEY,
value INTEGER,
category TEXT
)
''')
# 插入 10000 条测试数据
import random
large_data = [(i, random.randint(1, 1000), random.choice(['A', 'B', 'C']))
for i in range(1, 10001)]
cursor.executemany('INSERT INTO large_table VALUES (?, ?, ?)', large_data)
conn.commit()
# 示例 3a: 分块读取数据
# chunksize 参数指定每次返回多少行
print("分块读取数据:")
chunks = pd.read_sql('SELECT * FROM large_table', conn, chunksize=2000)
# 使用生成器处理每一块
for i, chunk in enumerate(chunks):
print(f"块 {i+1}: {len(chunk)} 行")
print(f" 该块 category='A' 的数量: {len(chunk[chunk['category'] == 'A'])}")
print()
# 示例 3b: 聚合计算(合并所有块)
# 如果需要全局统计,可以将所有块合并
all_chunks = []
chunks = pd.read_sql('SELECT category, SUM(value) as total FROM large_table GROUP BY category',
conn, chunksize=1000)
for chunk in chunks:
all_chunks.append(chunk)
# 合并结果
result = pd.concat(all_chunks, ignore_index=True)
print("聚合结果:")
print(result)
print()
# 关闭连接
conn.close()
# 清理测试文件
import os
os.remove('test_large.db')
print("测试数据库已清理")
运行结果预期:
分块读取数据:
块 1: 2000 行
块 读取 category='A' 的数据
...
块 5: 2000 行
该块 category='A' 的数量: 约 667 行
聚合结果:
category total
0 A 1687500
1 B 1678250
2 Chunks 读取 约 665 行
代码解析:
chunksize参数返回一个迭代器,每次迭代返回一个 DataFrame。- 分块读取适用于大数据集,可以避免内存不足的问题。
- 对于聚合查询,可以使用
pd.concat()合并所有块的结果。
注意事项
- 使用
read_sql()需要安装相应的数据库驱动。 - SQLite 不需要额外安装,直接可用。
- MySQL 需要
pymysql或mysql-connector-python:pip install pymysql。 - PostgreSQL 需要
psycopg2:pip install psycopg2。 - 使用参数化查询(
params参数)可以防止 SQL 注入攻击。 - 读取大表时,使用
chunksize参数分块读取。
小结
read_sql() 是 pandas 中连接数据库读取数据的核心函数。它可以执行任意 SQL 查询,并将查询结果转换为 DataFrame 格式。
在实际数据分析工作中,数据库是企业数据的核心存储方式,熟练掌握 read_sql() 可以直接从数据库中获取数据进行分析。注意使用参数化查询来保证安全性,以及使用分块读取处理大数据集。

Pandas 常用函数