Skills 性能优化
当 Skill 处理大文件或被频繁调用时,性能问题会直接影响用户体验。
本篇介绍在脚本层面提升处理速度的常用技巧。
性能问题的常见来源
| 来源 | 典型表现 | 优化方向 |
|---|---|---|
| 一次性加载大文件 | 读取 100MB CSV 耗时数秒 | 分块读取(chunk) |
| 重复执行相同计算 | 每次调用都重新计算统计值 | 缓存结果 |
| 单线程串行处理 | 处理 1000 个文件需要很长时间 | 并行处理 |
| 频繁的磁盘 I/O | 循环中逐行写入文件 | 批量写入 |
| 不必要的依赖加载 | import 耗时 2 秒以上 | 延迟导入 |
大文件分块读取
pandas 的 chunksize 参数允许将大文件分成若干批次逐块处理,避免一次性占用过多内存。
实例
# 文件路径:scripts/chunked_reader.py
import pandas as pd
import json
import sys
def process_large_csv(file_path: str, chunk_size: int = 10000) -> dict:
"""
分块读取并处理大型 CSV 文件
参数:
file_path: CSV 文件路径
chunk_size: 每块的行数,默认 10000
返回:
合并后的统计结果
"""
total_rows = 0
total_sum = 0.0
chunk_count = 0
# 逐块读取,不会一次性加载整个文件到内存
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
chunk_count += 1
total_rows += len(chunk)
# 对每块执行统计,结果累计
if "score" in chunk.columns:
total_sum += chunk["score"].sum()
# 实时汇报进度
print(f" 已处理第 {chunk_count} 批,累计 {total_rows} 行", flush=True)
avg = total_sum / total_rows if total_rows > 0 else 0
return {
"status": "success",
"total_rows": total_rows,
"chunks": chunk_count,
"score_avg": round(avg, 2)
}
if __name__ == "__main__":
file_path = sys.argv[1] if len(sys.argv) > 1 else ""
result = process_large_csv(file_path)
print(json.dumps(result, ensure_ascii=False, indent=2))
import pandas as pd
import json
import sys
def process_large_csv(file_path: str, chunk_size: int = 10000) -> dict:
"""
分块读取并处理大型 CSV 文件
参数:
file_path: CSV 文件路径
chunk_size: 每块的行数,默认 10000
返回:
合并后的统计结果
"""
total_rows = 0
total_sum = 0.0
chunk_count = 0
# 逐块读取,不会一次性加载整个文件到内存
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
chunk_count += 1
total_rows += len(chunk)
# 对每块执行统计,结果累计
if "score" in chunk.columns:
total_sum += chunk["score"].sum()
# 实时汇报进度
print(f" 已处理第 {chunk_count} 批,累计 {total_rows} 行", flush=True)
avg = total_sum / total_rows if total_rows > 0 else 0
return {
"status": "success",
"total_rows": total_rows,
"chunks": chunk_count,
"score_avg": round(avg, 2)
}
if __name__ == "__main__":
file_path = sys.argv[1] if len(sys.argv) > 1 else ""
result = process_large_csv(file_path)
print(json.dumps(result, ensure_ascii=False, indent=2))
已处理第 1 批,累计 10000 行
已处理第 2 批,累计 20000 行
已处理第 3 批,累计 28456 行
{
"status": "success",
"total_rows": 28456,
"chunks": 3,
"score_avg": 82.37
}
结果缓存
对于相同输入会产生相同结果的操作,将结果缓存到本地文件,避免重复计算。
实例
# 文件路径:scripts/file_cache.py
import hashlib
import json
import os
import time
CACHE_DIR = "/home/claude/.skill_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
def _file_fingerprint(file_path: str) -> str:
"""
根据文件路径、大小和修改时间生成指纹
比 MD5 文件内容快得多,适合大文件
"""
stat = os.stat(file_path)
raw = f"{file_path}|{stat.st_size}|{stat.st_mtime}"
return hashlib.md5(raw.encode()).hexdigest()
def get_cached(file_path: str, operation: str):
"""取缓存,不存在时返回 None"""
key = _file_fingerprint(file_path) + "_" + operation
cache_file = os.path.join(CACHE_DIR, f"{key}.json")
if os.path.exists(cache_file):
with open(cache_file) as f:
return json.load(f)
return None
def set_cached(file_path: str, operation: str, result: dict):
"""写入缓存"""
key = _file_fingerprint(file_path) + "_" + operation
cache_file = os.path.join(CACHE_DIR, f"{key}.json")
with open(cache_file, "w") as f:
json.dump(result, f, ensure_ascii=False)
# 使用示例
def get_stats(file_path: str) -> dict:
# 先查缓存
cached = get_cached(file_path, "stats")
if cached:
print("命中缓存,跳过重复计算")
return cached
# 缓存未命中,执行计算
import pandas as pd
t0 = time.time()
df = pd.read_csv(file_path)
result = {
"rows": len(df),
"cols": len(df.columns),
"elapsed": round(time.time() - t0, 3)
}
# 写入缓存供下次使用
set_cached(file_path, "stats", result)
return result
import hashlib
import json
import os
import time
CACHE_DIR = "/home/claude/.skill_cache"
os.makedirs(CACHE_DIR, exist_ok=True)
def _file_fingerprint(file_path: str) -> str:
"""
根据文件路径、大小和修改时间生成指纹
比 MD5 文件内容快得多,适合大文件
"""
stat = os.stat(file_path)
raw = f"{file_path}|{stat.st_size}|{stat.st_mtime}"
return hashlib.md5(raw.encode()).hexdigest()
def get_cached(file_path: str, operation: str):
"""取缓存,不存在时返回 None"""
key = _file_fingerprint(file_path) + "_" + operation
cache_file = os.path.join(CACHE_DIR, f"{key}.json")
if os.path.exists(cache_file):
with open(cache_file) as f:
return json.load(f)
return None
def set_cached(file_path: str, operation: str, result: dict):
"""写入缓存"""
key = _file_fingerprint(file_path) + "_" + operation
cache_file = os.path.join(CACHE_DIR, f"{key}.json")
with open(cache_file, "w") as f:
json.dump(result, f, ensure_ascii=False)
# 使用示例
def get_stats(file_path: str) -> dict:
# 先查缓存
cached = get_cached(file_path, "stats")
if cached:
print("命中缓存,跳过重复计算")
return cached
# 缓存未命中,执行计算
import pandas as pd
t0 = time.time()
df = pd.read_csv(file_path)
result = {
"rows": len(df),
"cols": len(df.columns),
"elapsed": round(time.time() - t0, 3)
}
# 写入缓存供下次使用
set_cached(file_path, "stats", result)
return result
并行处理多个文件
当需要处理多个文件时,使用 concurrent.futures 并行执行,速度可提升数倍。
实例
# 文件路径:scripts/parallel_process.py
import concurrent.futures
import os
import json
import sys
def process_single_file(file_path: str) -> dict:
"""处理单个文件(会被并行调用)"""
try:
size = os.path.getsize(file_path)
# 模拟实际处理逻辑
return {"file": os.path.basename(file_path),
"size_kb": size // 1024, "status": "ok"}
except Exception as e:
return {"file": file_path, "status": "error", "message": str(e)}
def process_files_parallel(file_paths: list, max_workers: int = 4) -> list:
"""
并行处理多个文件
参数:
file_paths: 文件路径列表
max_workers: 最大并发数,默认 4(避免过多进程争抢资源)
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_map = {
executor.submit(process_single_file, fp): fp
for fp in file_paths
}
# 按完成顺序收集结果
for future in concurrent.futures.as_completed(future_map):
result = future.result()
results.append(result)
print(f" 完成:{result['file']}")
return results
if __name__ == "__main__":
# 模拟:处理上传目录中的所有 CSV 文件
upload_dir = "/mnt/user-data/uploads"
csv_files = [
os.path.join(upload_dir, f)
for f in os.listdir(upload_dir)
if f.endswith(".csv")
]
if not csv_files:
print("未找到 CSV 文件")
sys.exit(0)
print(f"开始并行处理 {len(csv_files)} 个文件...")
results = process_files_parallel(csv_files)
print(json.dumps(results, ensure_ascii=False, indent=2))
import concurrent.futures
import os
import json
import sys
def process_single_file(file_path: str) -> dict:
"""处理单个文件(会被并行调用)"""
try:
size = os.path.getsize(file_path)
# 模拟实际处理逻辑
return {"file": os.path.basename(file_path),
"size_kb": size // 1024, "status": "ok"}
except Exception as e:
return {"file": file_path, "status": "error", "message": str(e)}
def process_files_parallel(file_paths: list, max_workers: int = 4) -> list:
"""
并行处理多个文件
参数:
file_paths: 文件路径列表
max_workers: 最大并发数,默认 4(避免过多进程争抢资源)
"""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_map = {
executor.submit(process_single_file, fp): fp
for fp in file_paths
}
# 按完成顺序收集结果
for future in concurrent.futures.as_completed(future_map):
result = future.result()
results.append(result)
print(f" 完成:{result['file']}")
return results
if __name__ == "__main__":
# 模拟:处理上传目录中的所有 CSV 文件
upload_dir = "/mnt/user-data/uploads"
csv_files = [
os.path.join(upload_dir, f)
for f in os.listdir(upload_dir)
if f.endswith(".csv")
]
if not csv_files:
print("未找到 CSV 文件")
sys.exit(0)
print(f"开始并行处理 {len(csv_files)} 个文件...")
results = process_files_parallel(csv_files)
print(json.dumps(results, ensure_ascii=False, indent=2))
开始并行处理 3 个文件...
完成:runoob_jan.csv
完成:runoob_mar.csv
完成:runoob_feb.csv
[{"file": "runoob_jan.csv", "size_kb": 128, "status": "ok"}, ...]
性能优化核查清单
| 检查项 | 优化前 | 优化后 |
|---|---|---|
| 读取大文件 | pd.read_csv(file) | pd.read_csv(file, chunksize=10000) |
| 重复统计同一文件 | 每次都重新计算 | 使用文件指纹缓存结果 |
| 处理多个文件 | for 循环串行 | ThreadPoolExecutor 并行 |
| 逐行写入文件 | for row: f.write(row) | 批量收集后一次写入 |
| 不必要的 import | 脚本顶部全部 import | 在用到时再 import(延迟导入) |
优化前先测量,不要凭感觉优化。用
time.perf_counter()或@timeit装饰器找出真正的瓶颈,再有针对性地优化,避免过早优化带来不必要的代码复杂度。
