现在位置: 首页 > Skills 教程 > 正文

Skills 性能优化

当 Skill 处理大文件或被频繁调用时,性能问题会直接影响用户体验。

本篇介绍在脚本层面提升处理速度的常用技巧。


性能问题的常见来源

来源典型表现优化方向
一次性加载大文件读取 100MB CSV 耗时数秒分块读取(chunk)
重复执行相同计算每次调用都重新计算统计值缓存结果
单线程串行处理处理 1000 个文件需要很长时间并行处理
频繁的磁盘 I/O循环中逐行写入文件批量写入
不必要的依赖加载import 耗时 2 秒以上延迟导入

大文件分块读取

pandas 的 chunksize 参数允许将大文件分成若干批次逐块处理,避免一次性占用过多内存。

实例

# 文件路径:scripts/chunked_reader.py
import pandas as pd
import json
import sys

def process_large_csv(file_path: str, chunk_size: int = 10000) -> dict:
    """
    分块读取并处理大型 CSV 文件

    参数:
        file_path:  CSV 文件路径
        chunk_size: 每块的行数,默认 10000

    返回:
        合并后的统计结果
    """

    total_rows   = 0
    total_sum    = 0.0
    chunk_count  = 0

    # 逐块读取,不会一次性加载整个文件到内存
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        chunk_count += 1
        total_rows  += len(chunk)

        # 对每块执行统计,结果累计
        if "score" in chunk.columns:
            total_sum += chunk["score"].sum()

        # 实时汇报进度
        print(f"  已处理第 {chunk_count} 批,累计 {total_rows} 行", flush=True)

    avg = total_sum / total_rows if total_rows > 0 else 0
    return {
        "status":      "success",
        "total_rows":  total_rows,
        "chunks":      chunk_count,
        "score_avg":   round(avg, 2)
    }

if __name__ == "__main__":
    file_path = sys.argv[1] if len(sys.argv) > 1 else ""
    result    = process_large_csv(file_path)
    print(json.dumps(result, ensure_ascii=False, indent=2))
  已处理第 1 批,累计 10000 行
  已处理第 2 批,累计 20000 行
  已处理第 3 批,累计 28456 行
{
  "status": "success",
  "total_rows": 28456,
  "chunks": 3,
  "score_avg": 82.37
}

结果缓存

对于相同输入会产生相同结果的操作,将结果缓存到本地文件,避免重复计算。

实例

# 文件路径:scripts/file_cache.py
import hashlib
import json
import os
import time

CACHE_DIR = "/home/claude/.skill_cache"
os.makedirs(CACHE_DIR, exist_ok=True)

def _file_fingerprint(file_path: str) -> str:
    """
    根据文件路径、大小和修改时间生成指纹
    比 MD5 文件内容快得多,适合大文件
    """

    stat = os.stat(file_path)
    raw  = f"{file_path}|{stat.st_size}|{stat.st_mtime}"
    return hashlib.md5(raw.encode()).hexdigest()

def get_cached(file_path: str, operation: str):
    """取缓存,不存在时返回 None"""
    key        = _file_fingerprint(file_path) + "_" + operation
    cache_file = os.path.join(CACHE_DIR, f"{key}.json")
    if os.path.exists(cache_file):
        with open(cache_file) as f:
            return json.load(f)
    return None

def set_cached(file_path: str, operation: str, result: dict):
    """写入缓存"""
    key        = _file_fingerprint(file_path) + "_" + operation
    cache_file = os.path.join(CACHE_DIR, f"{key}.json")
    with open(cache_file, "w") as f:
        json.dump(result, f, ensure_ascii=False)

# 使用示例
def get_stats(file_path: str) -> dict:
    # 先查缓存
    cached = get_cached(file_path, "stats")
    if cached:
        print("命中缓存,跳过重复计算")
        return cached

    # 缓存未命中,执行计算
    import pandas as pd
    t0 = time.time()
    df = pd.read_csv(file_path)
    result = {
        "rows":    len(df),
        "cols":    len(df.columns),
        "elapsed": round(time.time() - t0, 3)
    }

    # 写入缓存供下次使用
    set_cached(file_path, "stats", result)
    return result

并行处理多个文件

当需要处理多个文件时,使用 concurrent.futures 并行执行,速度可提升数倍。

实例

# 文件路径:scripts/parallel_process.py
import concurrent.futures
import os
import json
import sys

def process_single_file(file_path: str) -> dict:
    """处理单个文件(会被并行调用)"""
    try:
        size = os.path.getsize(file_path)
        # 模拟实际处理逻辑
        return {"file": os.path.basename(file_path),
                "size_kb": size // 1024, "status": "ok"}
    except Exception as e:
        return {"file": file_path, "status": "error", "message": str(e)}

def process_files_parallel(file_paths: list, max_workers: int = 4) -> list:
    """
    并行处理多个文件

    参数:
        file_paths:  文件路径列表
        max_workers: 最大并发数,默认 4(避免过多进程争抢资源)
    """

    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_map = {
            executor.submit(process_single_file, fp): fp
            for fp in file_paths
        }
        # 按完成顺序收集结果
        for future in concurrent.futures.as_completed(future_map):
            result = future.result()
            results.append(result)
            print(f"  完成:{result['file']}")
    return results

if __name__ == "__main__":
    # 模拟:处理上传目录中的所有 CSV 文件
    upload_dir = "/mnt/user-data/uploads"
    csv_files  = [
        os.path.join(upload_dir, f)
        for f in os.listdir(upload_dir)
        if f.endswith(".csv")
    ]

    if not csv_files:
        print("未找到 CSV 文件")
        sys.exit(0)

    print(f"开始并行处理 {len(csv_files)} 个文件...")
    results = process_files_parallel(csv_files)
    print(json.dumps(results, ensure_ascii=False, indent=2))
开始并行处理 3 个文件...
  完成:runoob_jan.csv
  完成:runoob_mar.csv
  完成:runoob_feb.csv
[{"file": "runoob_jan.csv", "size_kb": 128, "status": "ok"}, ...]

性能优化核查清单

检查项优化前优化后
读取大文件pd.read_csv(file)pd.read_csv(file, chunksize=10000)
重复统计同一文件每次都重新计算使用文件指纹缓存结果
处理多个文件for 循环串行ThreadPoolExecutor 并行
逐行写入文件for row: f.write(row)批量收集后一次写入
不必要的 import脚本顶部全部 import在用到时再 import(延迟导入)

优化前先测量,不要凭感觉优化。用 time.perf_counter()@timeit 装饰器找出真正的瓶颈,再有针对性地优化,避免过早优化带来不必要的代码复杂度。