Skill 数据清洗与分析
本项目构建一个数据清洗 Skill,自动处理用户上传的 CSV/Excel 数据,完成清洗、统计和报告生成。
目标:掌握 pandas 数据处理流水线与多步骤 Skill 的设计。
项目功能概览
| 步骤 | 操作 | 输出 |
|---|---|---|
| 1. 读取数据 | 支持 CSV、Excel | 行数、列名、数据类型 |
| 2. 质量检测 | 空值、重复行、异常值扫描 | 质量问题报告 |
| 3. 数据清洗 | 去重、填充空值、标准化列名 | 清洗后的 CSV |
| 4. 统计分析 | 均值、中位数、分布 | 统计摘要表格 |
| 5. 生成报告 | 汇总以上结果 | Excel 报告文件 |
目录结构
data-cleaner/
├── SKILL.md
└── scripts/
├── requirements.txt
├── load_data.py
├── quality_check.py
├── clean_data.py
├── calc_stats.py
└── gen_report.py
步骤一:加载数据
实例
# 文件路径:scripts/load_data.py
import pandas as pd
import sys, json, os
def load(file_path: str) -> dict:
"""加载 CSV 或 Excel 文件,返回基本信息"""
if not os.path.exists(file_path):
return {"status": "error", "message": f"文件不存在:{file_path}"}
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == ".csv":
df = pd.read_csv(file_path, encoding="utf-8")
elif ext in (".xlsx", ".xls"):
df = pd.read_excel(file_path)
else:
return {"status": "error",
"message": f"不支持的格式:{ext},请使用 .csv 或 .xlsx"}
except UnicodeDecodeError:
# 尝试 GBK 编码(常见于中文 Windows 导出的 CSV)
df = pd.read_csv(file_path, encoding="gbk")
# 将 df 保存到临时文件供后续步骤使用
tmp_path = "/home/claude/loaded_data.csv"
df.to_csv(tmp_path, index=False, encoding="utf-8")
return {
"status": "success",
"rows": len(df),
"cols": len(df.columns),
"columns": list(df.columns),
"dtypes": df.dtypes.astype(str).to_dict(),
"tmp_path": tmp_path
}
if __name__ == "__main__":
print(json.dumps(load(sys.argv[1] if len(sys.argv) > 1 else ""),
ensure_ascii=False, indent=2))
import pandas as pd
import sys, json, os
def load(file_path: str) -> dict:
"""加载 CSV 或 Excel 文件,返回基本信息"""
if not os.path.exists(file_path):
return {"status": "error", "message": f"文件不存在:{file_path}"}
ext = os.path.splitext(file_path)[1].lower()
try:
if ext == ".csv":
df = pd.read_csv(file_path, encoding="utf-8")
elif ext in (".xlsx", ".xls"):
df = pd.read_excel(file_path)
else:
return {"status": "error",
"message": f"不支持的格式:{ext},请使用 .csv 或 .xlsx"}
except UnicodeDecodeError:
# 尝试 GBK 编码(常见于中文 Windows 导出的 CSV)
df = pd.read_csv(file_path, encoding="gbk")
# 将 df 保存到临时文件供后续步骤使用
tmp_path = "/home/claude/loaded_data.csv"
df.to_csv(tmp_path, index=False, encoding="utf-8")
return {
"status": "success",
"rows": len(df),
"cols": len(df.columns),
"columns": list(df.columns),
"dtypes": df.dtypes.astype(str).to_dict(),
"tmp_path": tmp_path
}
if __name__ == "__main__":
print(json.dumps(load(sys.argv[1] if len(sys.argv) > 1 else ""),
ensure_ascii=False, indent=2))
步骤二:质量检测
实例
# 文件路径:scripts/quality_check.py
import pandas as pd
import sys, json
def check(file_path: str) -> dict:
"""扫描数据质量问题"""
df = pd.read_csv(file_path)
# 空值统计
null_counts = df.isnull().sum()
null_issues = [
{"column": col, "null_count": int(cnt),
"null_pct": round(cnt / len(df) * 100, 1)}
for col, cnt in null_counts.items() if cnt > 0
]
# 重复行
dup_count = int(df.duplicated().sum())
# 异常值检测(数值列的 IQR 方法)
outlier_issues = []
for col in df.select_dtypes(include="number").columns:
q1, q3 = df[col].quantile(0.25), df[col].quantile(0.75)
iqr = q3 - q1
lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
outliers = df[(df[col] < lower) | (df[col] > upper)]
if len(outliers) > 0:
outlier_issues.append({
"column": col,
"count": len(outliers),
"range": f"[{lower:.2f}, {upper:.2f}]"
})
issues_count = len(null_issues) + (1 if dup_count > 0 else 0) + len(outlier_issues)
return {
"status": "success",
"total_issues": issues_count,
"duplicate_rows": dup_count,
"null_columns": null_issues,
"outlier_columns": outlier_issues
}
if __name__ == "__main__":
print(json.dumps(check(sys.argv[1] if len(sys.argv) > 1 else ""),
ensure_ascii=False, indent=2))
import pandas as pd
import sys, json
def check(file_path: str) -> dict:
"""扫描数据质量问题"""
df = pd.read_csv(file_path)
# 空值统计
null_counts = df.isnull().sum()
null_issues = [
{"column": col, "null_count": int(cnt),
"null_pct": round(cnt / len(df) * 100, 1)}
for col, cnt in null_counts.items() if cnt > 0
]
# 重复行
dup_count = int(df.duplicated().sum())
# 异常值检测(数值列的 IQR 方法)
outlier_issues = []
for col in df.select_dtypes(include="number").columns:
q1, q3 = df[col].quantile(0.25), df[col].quantile(0.75)
iqr = q3 - q1
lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
outliers = df[(df[col] < lower) | (df[col] > upper)]
if len(outliers) > 0:
outlier_issues.append({
"column": col,
"count": len(outliers),
"range": f"[{lower:.2f}, {upper:.2f}]"
})
issues_count = len(null_issues) + (1 if dup_count > 0 else 0) + len(outlier_issues)
return {
"status": "success",
"total_issues": issues_count,
"duplicate_rows": dup_count,
"null_columns": null_issues,
"outlier_columns": outlier_issues
}
if __name__ == "__main__":
print(json.dumps(check(sys.argv[1] if len(sys.argv) > 1 else ""),
ensure_ascii=False, indent=2))
步骤三:数据清洗
实例
# 文件路径:scripts/clean_data.py
import pandas as pd
import sys, json, os
def clean(file_path: str, output_path: str = "/home/claude/cleaned_data.csv") -> dict:
"""执行标准清洗流程"""
df = pd.read_csv(file_path)
stats = {}
# 1. 标准化列名(去除空格,转小写,空格换下划线)
original_cols = list(df.columns)
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
stats["cols_renamed"] = sum(a != b for a, b in zip(original_cols, df.columns))
# 2. 删除完全重复的行
before = len(df)
df.drop_duplicates(inplace=True)
stats["dup_removed"] = before - len(df)
# 3. 数值列空值填 0,字符串列填空字符串
null_before = int(df.isnull().sum().sum())
for col in df.columns:
if df[col].dtype in ("int64", "float64"):
df[col].fillna(0, inplace=True)
else:
df[col].fillna("", inplace=True)
stats["nulls_filled"] = null_before - int(df.isnull().sum().sum())
# 4. 去除字符串首尾空格
for col in df.select_dtypes(include="object").columns:
df[col] = df[col].str.strip()
df.to_csv(output_path, index=False, encoding="utf-8")
return {
"status": "success",
"output": output_path,
"rows_after": len(df),
"cols_renamed": stats["cols_renamed"],
"dup_removed": stats["dup_removed"],
"nulls_filled": stats["nulls_filled"]
}
if __name__ == "__main__":
print(json.dumps(clean(sys.argv[1] if len(sys.argv) > 1 else ""),
ensure_ascii=False, indent=2))
import pandas as pd
import sys, json, os
def clean(file_path: str, output_path: str = "/home/claude/cleaned_data.csv") -> dict:
"""执行标准清洗流程"""
df = pd.read_csv(file_path)
stats = {}
# 1. 标准化列名(去除空格,转小写,空格换下划线)
original_cols = list(df.columns)
df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]
stats["cols_renamed"] = sum(a != b for a, b in zip(original_cols, df.columns))
# 2. 删除完全重复的行
before = len(df)
df.drop_duplicates(inplace=True)
stats["dup_removed"] = before - len(df)
# 3. 数值列空值填 0,字符串列填空字符串
null_before = int(df.isnull().sum().sum())
for col in df.columns:
if df[col].dtype in ("int64", "float64"):
df[col].fillna(0, inplace=True)
else:
df[col].fillna("", inplace=True)
stats["nulls_filled"] = null_before - int(df.isnull().sum().sum())
# 4. 去除字符串首尾空格
for col in df.select_dtypes(include="object").columns:
df[col] = df[col].str.strip()
df.to_csv(output_path, index=False, encoding="utf-8")
return {
"status": "success",
"output": output_path,
"rows_after": len(df),
"cols_renamed": stats["cols_renamed"],
"dup_removed": stats["dup_removed"],
"nulls_filled": stats["nulls_filled"]
}
if __name__ == "__main__":
print(json.dumps(clean(sys.argv[1] if len(sys.argv) > 1 else ""),
ensure_ascii=False, indent=2))
SKILL.md 完整内容
--- name: data-cleaner version: 1.0.0 description: > 清洗并分析用户上传的 CSV/Excel 数据,包含质量检测、去重、 空值处理、统计分析和报告生成。当用户需要数据清洗、数据预处理、 统计分析、数据质量报告时触发。 --- # 数据清洗与分析助手 ## 执行流程 ### 第一步:加载数据 ```bash cd scripts/ python load_data.py <文件路径> ``` 告知用户数据规模(行数、列数)和列名。 ### 第二步:质量检测 ```bash python quality_check.py /home/claude/loaded_data.csv ``` 展示质量报告:有哪些列有空值、有多少重复行、哪些列有异常值。 询问用户:是否按默认规则清洗?或需要自定义处理方式? ### 第三步:执行清洗 用户确认后执行: ```bash python clean_data.py /home/claude/loaded_data.csv ``` 告知清洗结果:删除了多少重复行、填充了多少空值。 ### 第四步:输出清洗后的文件 将 /home/claude/cleaned_data.csv 复制到输出目录后调用 present_files 展示。
