现在位置: 首页 > Pandas 教程 > 正文

Pandas pd.read_sql() 函数

Python math 模块 Pandas 常用函数


read_sql() 是 pandas 库中用于从数据库读取数据的函数,支持 SQL 查询和数据库表的直接读取。

数据库是企业级应用的核心组件,存储着大量业务数据。read_sql() 能够连接各种类型的数据库(如 MySQL、PostgreSQL、SQLite 等),执行 SQL 查询并将结果转换为 DataFrame,方便后续的数据分析和处理。


基本语法与参数

语法格式

pandas.read_sql(sql, con, index_col=None, coerce_float=True, params=None,
                parse_dates=None, chunksize=None, dtype=None, ...)

参数说明

参数类型说明默认值
sqlstr 或 SQLAlchemy SelectableSQL 查询语句或表名必填
consqlalchemy engine, 或 sqlite3 connection数据库连接对象必填
index_colstr, list of str用作行索引的列名None
coerce_floatbool是否尝试将数值字符串转换为浮点数True
paramslist, tuple, dictSQL 参数化查询的参数None
parse_dateslist, dict需要解析为日期的列None
chunksizeint分块返回的迭代器大小None

返回值

  • 返回类型pd.DataFrame 或 Iterator[DataFrame]
  • chunksize 为 None 时,返回单个 DataFrame。
  • 当设置 chunksize 时,返回一个迭代器,每次迭代返回一个 DataFrame。

实例

通过以下示例,全面掌握 read_sql() 的各种用法。

示例 1:使用 SQLite 数据库

SQLite 是轻量级的嵌入式数据库,无需单独的数据库服务器,非常适合学习和测试。

实例

import pandas as pd
import sqlite3

# 创建内存 SQLite 数据库并插入测试数据
conn = sqlite3.connect(':memory:')

# 创建表并插入数据
cursor = conn.cursor()

# 创建员工表
cursor.execute('''
    CREATE TABLE employees (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        age INTEGER,
        city TEXT,
        salary INTEGER
    )
'''
)

# 插入测试数据
employees_data = [
    (1, 'Tom', 28, 'Beijing', 8000),
    (2, 'Jerry', 35, 'Shanghai', 12000),
    (3, 'Mike', 42, 'Guangzhou', 15000),
    (4, 'Lucy', 26, 'Shenzhen', 7000),
    (5, 'John', 31, 'Beijing', 9500)
]

cursor.executemany('INSERT INTO employees VALUES (?, ?, ?, ?, ?)', employees_data)
conn.commit()

# 示例 1a: 执行 SQL 查询并读取数据
# sql: SQL 查询语句(必填)
# con: 数据库连接对象(必填)
query = "SELECT name, age, city, salary FROM employees WHERE salary > 8000"
df_query = pd.read_sql(query, conn)
print("执行 SQL 查询:")
print(df_query)
print()

# 示例 1b: 直接读取整个表
# 可以将表名作为 sql 参数
df_table = pd.read_sql('employees', conn)
print("读取整个表:")
print(df_table)
print()

# 示例 1c: 使用 index_col 设置索引
df_indexed = pd.read_sql('employees', conn, index_col='id')
print("设置索引列:")
print(df_indexed)

运行结果预期:

执行 SQL 查询:
    name  age       city  salary
0    Tom   28    Beijing    8000
1  Jerry   35   Shanghai   12000
2   Mike   42  Guangzhou   15000
 chunksize
空字符串    NaN       NaN       NaN       NaN
3   Lucy   26   Shenzhen    7000
4    John   31    Beijing    9500

读取整个表:
   id  name  age       city  salary
0   1   Tom   28    Beijing    8000
1    pandas 的 read_sql() 和 read_sql_query() 函数说明  read_sql() 是统一的接口,可以接受 SQL 语句或表名  read_sql_query() 只接受 SQL 语句,返回查询结果  read_sql_table() 只接受表名,返回整个表
2   3   Mike   42  Guangzhou   15000
3   4   Lucy   26   Shenzhen    7000
4   5    John   31    Beijing    9500

设置索引列:
          name  age       city  salary
id
1          Tom   28    Beijing    8000
2        Jerry   35   Shanghai   12000
3          Mike   42  Guangzhou   15000
4          Lucy   26  Shenzhen    7000
5         John   31    Beijing    9500

代码解析:

  • read_sql() 接受 SQL 查询语句作为第一个参数。
  • con 参数需要传入数据库连接对象,可以是 sqlite3、SQLAlchemy 等的连接。
  • 可以直接传入表名来读取整个表的数据。
  • index_col 参数可以指定某列作为行索引。

示例 2:使用 SQLAlchemy 连接数据库

SQLAlchemy 是 Python 中最流行的 ORM 框架,支持多种数据库的连接。

实例

import pandas as pd
from sqlalchemy import create_engine, text

# 使用 SQLAlchemy 创建内存 SQLite 数据库
engine = create_engine('sqlite:///:memory:')

# 创建测试数据
with engine.connect() as conn:
    # 创建产品表
    conn.execute(text('''
        CREATE TABLE products (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            category TEXT,
            price REAL,
            stock INTEGER
        )
    '''
))

    # 插入数据
    products_data = [
        ('A', 'Electronics', 999.99, 50),
        ('B', 'Electronics', 499.99, 100),
        ('C', 'Clothing', 29.99, 200),
        ('D', 'Food', 5.99, 500),
        ('E', 'Books', 19.99, 150)
    ]

    conn.execute(text('''
        INSERT INTO products (name, category, price, stock) VALUES (?, ?, ?, ?)
    '''
), products_data)
    conn.commit()

# 示例 2a: 使用 SQLAlchemy 引擎读取数据
query = "SELECT * FROM products WHERE category = :category"
df_params = pd.read_sql(
    query,
    engine,
    params={'category': 'Electronics'}  # 参数化查询,防止 SQL 注入
)
print("使用参数化查询:")
print(df_params)
print()

# 示例 2b: 直接使用表名读取
df_all = pd.read_sql('products', engine)
print("读取整个表:")
print(df_all)
print()

# 示例 2c: 尝试其他数据库连接(以 MySQL 为例)
# 注意:需要安装相应的数据库驱动
# pip install pymysql  # MySQL
# pip install psycopg2  # PostgreSQL

# MySQL 连接示例
# engine_mysql = create_engine('mysql+pymysql://username:password@host/database')
# df_mysql = pd.read_sql('SELECT * FROM table_name', engine_mysql)

print("注意: 实际连接 MySQL/PostgreSQL 需要安装相应驱动")

运行结果预期:

使用参数化查询:
   id  name     category   price  stock
0   1      A  Electronics  999.99     50
 的 read_sql() 配对    write_frame() 已弃用,使用 to_sql() 方法替代  to_sql() 可以将 DataFrame 写入数据库表
2   2      B  Electronics  499.99    100

读取整个表:
   id  名称       类别    价格    库存
0   1      A  Electronics  999.99     50
1   2      B  ...  读取    499. 参数     100
2   3      C  Clothing     29.99     200
3   4      配对    Food       5.99    500
4   5      E  Books       19.99    150

代码解析:

  • SQLAlchemy 提供了统一的数据库连接接口,支持 MySQL、PostgreSQL、Oracle 等多种数据库。
  • params 参数用于参数化查询,可以防止 SQL 注入攻击。
  • 使用 SQLAlchemy 的 create_engine() 创建数据库引擎。

示例 3:分块读取和处理大数据

当查询结果数据量很大时,可以分块读取以节省内存。

实例

import pandas as pd
import sqlite3

# 创建大量测试数据
conn = sqlite3.connect('test_large.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
    CREATE TABLE large_table (
        id INTEGER PRIMARY KEY,
        value INTEGER,
        category TEXT
    )
'''
)

# 插入 10000 条测试数据
import random
large_data = [(i, random.randint(1, 1000), random.choice(['A', 'B', 'C']))
              for i in range(1, 10001)]
cursor.executemany('INSERT INTO large_table VALUES (?, ?, ?)', large_data)
conn.commit()

# 示例 3a: 分块读取数据
# chunksize 参数指定每次返回多少行
print("分块读取数据:")
chunks = pd.read_sql('SELECT * FROM large_table', conn, chunksize=2000)

# 使用生成器处理每一块
for i, chunk in enumerate(chunks):
    print(f"块 {i+1}: {len(chunk)} 行")
    print(f"  该块 category='A' 的数量: {len(chunk[chunk['category'] == 'A'])}")
print()

# 示例 3b: 聚合计算(合并所有块)
# 如果需要全局统计,可以将所有块合并
all_chunks = []
chunks = pd.read_sql('SELECT category, SUM(value) as total FROM large_table GROUP BY category',
                     conn, chunksize=1000)
for chunk in chunks:
    all_chunks.append(chunk)

# 合并结果
result = pd.concat(all_chunks, ignore_index=True)
print("聚合结果:")
print(result)
print()

# 关闭连接
conn.close()

# 清理测试文件
import os
os.remove('test_large.db')
print("测试数据库已清理")

运行结果预期:

分块读取数据:
块 1: 2000 行
块 读取    category='A' 的数据
...
块 5: 2000 行
  该块 category='A' 的数量: 约 667 行

聚合结果:
     category      total
0         A   1687500
1         B   1678250
2         Chunks    读取    约 665 行

代码解析:

  • chunksize 参数返回一个迭代器,每次迭代返回一个 DataFrame。
  • 分块读取适用于大数据集,可以避免内存不足的问题。
  • 对于聚合查询,可以使用 pd.concat() 合并所有块的结果。

注意事项

  • 使用 read_sql() 需要安装相应的数据库驱动。
  • SQLite 不需要额外安装,直接可用。
  • MySQL 需要 pymysqlmysql-connector-pythonpip install pymysql
  • PostgreSQL 需要 psycopg2pip install psycopg2
  • 使用参数化查询(params 参数)可以防止 SQL 注入攻击。
  • 读取大表时,使用 chunksize 参数分块读取。

小结

read_sql() 是 pandas 中连接数据库读取数据的核心函数。它可以执行任意 SQL 查询,并将查询结果转换为 DataFrame 格式。

在实际数据分析工作中,数据库是企业数据的核心存储方式,熟练掌握 read_sql() 可以直接从数据库中获取数据进行分析。注意使用参数化查询来保证安全性,以及使用分块读取处理大数据集。

Python math 模块 Pandas 常用函数