现在位置: 首页 > Pandas 教程 > 正文

Pandas 处理大文件(chunksize)

处理大型数据文件时,内存可能不足以一次性加载所有数据。Pandas 提供了分块读取功能,可以分批处理数据。


分块读取 CSV

chunksize 参数

实例

import pandas as pd
import io

# 模拟大文件内容(实际使用时替换为文件路径)
data = """id,value
1,100
2,200
3,300
4,400
5,500
6,600
7,700
8,800
9,900
10,1000
"""


# 使用 chunksize 分块读取
chunks = pd.read_csv(io.StringIO(data), chunksize=3)

print("分块处理:")
for i, chunk in enumerate(chunks):
    print(f"\n块 {i+1}:")
    print(chunk)

聚合处理

实例

import pandas as pd
import io
import numpy as np

# 模拟大文件
data = "value\n" + "\n".join([str(i) for i in range(1, 1001)])

# 分块计算总和
total = 0
count = 0

for chunk in pd.read_csv(io.StringIO(data), chunksize=100):
    total += chunk["value"].sum()
    count += len(chunk)

print(f"总行数: {count}")
print(f"总和: {total}")
print(f"平均值: {total / count}")

增量处理

筛选过滤

实例

import pandas as pd
import io

# 模拟数据
data = """id,value,category
1,100,A
2,200,B
3,150,A
4,300,C
5,250,B
6,180,A
"""


# 筛选特定条件的数据
filtered_chunks = []

for chunk in pd.read_csv(io.StringIO(data), chunksize=2):
    filtered = chunk[chunk["category"] == "A"]
    if len(filtered) > 0:
        filtered_chunks.append(filtered)

result = pd.concat(filtered_chunks)
print("筛选 category='A' 的数据:")
print(result)

多进程并行处理

实例

import pandas as pd
import io
from concurrent.futures import ProcessPoolExecutor

# 并行处理(适用于CPU密集型任务)
def process_chunk(chunk):
    """处理单个数据块"""
    return chunk["value"].sum()

# 模拟数据
data = "value\n" + "\n".join([str(i) for i in range(1, 101)])

# 准备数据块
chunks = list(pd.read_csv(io.StringIO(data), chunksize=10))

# 串行处理
total = sum(process_chunk(chunk) for chunk in chunks)
print(f"串行处理总和: {total}")

处理 JSON 大文件

实例

import pandas as pd
import json
import io

# 模拟 JSON Lines 格式数据
jsonl_data = '\n'.join([
    json.dumps({"id": i, "value": i * 10})
    for i in range(1, 101)
])

# 分块读取 JSON Lines
chunks = []
chunk_size = 20

for chunk in pd.read_json(io.StringIO(jsonl_data), lines=True, chunksize=chunk_size):
    chunks.append(chunk)
    if len(chunks) >= 3:  # 只处理前3块作为示例
        break

result = pd.concat(chunks)
print(f"读取了 {len(result)} 行数据")
print(result.head())

分块处理可以显著降低内存占用,但会增加处理时间。合理选择 chunk 大小以平衡内存和时间。