多 Skill 协作工作流
本项目将前三个实战项目整合为一个完整的"数据报告生成"工作流。
目标:掌握多 Skill 编排,以及如何设计一个统筹全局的编排 Skill。
工作流目标
用户上传一份业务数据文件,系统自动完成以下全链路工作,最终交付一份完整的业务报告。
编排 Skill 的设计原则
编排 Skill 不做具体的业务处理,只负责调度其他 Skill 的输出、传递中间结果、在关键节点向用户汇报进度。
| 编排 Skill 做的事 | 编排 Skill 不做的事 |
|---|---|
| 按顺序调用子 Skill 的脚本 | 重复实现子 Skill 已有的功能 |
| 在步骤间传递文件路径 | 直接操作数据内容 |
| 向用户汇报整体进度 | 关注单个步骤的内部细节 |
| 处理步骤失败后的补救逻辑 | 捕获子 Skill 内部的异常 |
编排脚本
实例
# 文件路径:scripts/orchestrator.py
# 多 Skill 协作的编排入口脚本
import subprocess
import sys
import json
import os
import shutil
SKILLS_BASE = "/mnt/skills/public"
WORK_DIR = "/home/claude/report_work"
OUTPUT_DIR = "/mnt/user-data/outputs"
def run(script_path: str, *args) -> dict:
"""运行指定脚本,返回 JSON 结果"""
cmd = [sys.executable, script_path] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {
"status": "error",
"message": result.stderr or result.stdout or "脚本无输出"
}
def orchestrate(input_file: str) -> dict:
"""执行完整的报告生成工作流"""
os.makedirs(WORK_DIR, exist_ok=True)
# ── 阶段一:数据清洗 ─────────────────────────────────
print("【1/3】正在清洗数据...", flush=True)
clean_result = run(
f"{SKILLS_BASE}/data-cleaner/scripts/clean_data.py",
input_file,
os.path.join(WORK_DIR, "cleaned.csv")
)
if clean_result.get("status") != "success":
return {"status": "error", "stage": "数据清洗",
"message": clean_result.get("message")}
print(f" 已清洗:删除重复行 {clean_result.get('dup_removed', 0)} 条,"
f"填充空值 {clean_result.get('nulls_filled', 0)} 处", flush=True)
# ── 阶段二:统计分析 ─────────────────────────────────
print("【2/3】正在生成统计报告...", flush=True)
stats_result = run(
f"{SKILLS_BASE}/data-cleaner/scripts/calc_stats.py",
clean_result["output"],
os.path.join(WORK_DIR, "stats.json")
)
if stats_result.get("status") != "success":
return {"status": "error", "stage": "统计分析",
"message": stats_result.get("message")}
# ── 阶段三:生成 Excel 报告 ──────────────────────────
print("【3/3】正在生成 Excel 报告...", flush=True)
output_name = os.path.splitext(os.path.basename(input_file))[0] + "_report.xlsx"
output_path = os.path.join(OUTPUT_DIR, output_name)
report_result = run(
f"{SKILLS_BASE}/data-cleaner/scripts/gen_report.py",
clean_result["output"],
stats_result.get("output", ""),
output_path
)
if report_result.get("status") != "success":
return {"status": "error", "stage": "报告生成",
"message": report_result.get("message")}
# 清理工作目录
shutil.rmtree(WORK_DIR, ignore_errors=True)
return {
"status": "success",
"output": output_path,
"clean_stats": {
"dup_removed": clean_result.get("dup_removed", 0),
"nulls_filled": clean_result.get("nulls_filled", 0)
}
}
if __name__ == "__main__":
if len(sys.argv) < 2:
print(json.dumps({"status": "error", "message": "用法:python orchestrator.py <文件路径>"}))
sys.exit(1)
result = orchestrate(sys.argv[1])
print(json.dumps(result, ensure_ascii=False, indent=2))
# 多 Skill 协作的编排入口脚本
import subprocess
import sys
import json
import os
import shutil
SKILLS_BASE = "/mnt/skills/public"
WORK_DIR = "/home/claude/report_work"
OUTPUT_DIR = "/mnt/user-data/outputs"
def run(script_path: str, *args) -> dict:
"""运行指定脚本,返回 JSON 结果"""
cmd = [sys.executable, script_path] + list(args)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
try:
return json.loads(result.stdout)
except json.JSONDecodeError:
return {
"status": "error",
"message": result.stderr or result.stdout or "脚本无输出"
}
def orchestrate(input_file: str) -> dict:
"""执行完整的报告生成工作流"""
os.makedirs(WORK_DIR, exist_ok=True)
# ── 阶段一:数据清洗 ─────────────────────────────────
print("【1/3】正在清洗数据...", flush=True)
clean_result = run(
f"{SKILLS_BASE}/data-cleaner/scripts/clean_data.py",
input_file,
os.path.join(WORK_DIR, "cleaned.csv")
)
if clean_result.get("status") != "success":
return {"status": "error", "stage": "数据清洗",
"message": clean_result.get("message")}
print(f" 已清洗:删除重复行 {clean_result.get('dup_removed', 0)} 条,"
f"填充空值 {clean_result.get('nulls_filled', 0)} 处", flush=True)
# ── 阶段二:统计分析 ─────────────────────────────────
print("【2/3】正在生成统计报告...", flush=True)
stats_result = run(
f"{SKILLS_BASE}/data-cleaner/scripts/calc_stats.py",
clean_result["output"],
os.path.join(WORK_DIR, "stats.json")
)
if stats_result.get("status") != "success":
return {"status": "error", "stage": "统计分析",
"message": stats_result.get("message")}
# ── 阶段三:生成 Excel 报告 ──────────────────────────
print("【3/3】正在生成 Excel 报告...", flush=True)
output_name = os.path.splitext(os.path.basename(input_file))[0] + "_report.xlsx"
output_path = os.path.join(OUTPUT_DIR, output_name)
report_result = run(
f"{SKILLS_BASE}/data-cleaner/scripts/gen_report.py",
clean_result["output"],
stats_result.get("output", ""),
output_path
)
if report_result.get("status") != "success":
return {"status": "error", "stage": "报告生成",
"message": report_result.get("message")}
# 清理工作目录
shutil.rmtree(WORK_DIR, ignore_errors=True)
return {
"status": "success",
"output": output_path,
"clean_stats": {
"dup_removed": clean_result.get("dup_removed", 0),
"nulls_filled": clean_result.get("nulls_filled", 0)
}
}
if __name__ == "__main__":
if len(sys.argv) < 2:
print(json.dumps({"status": "error", "message": "用法:python orchestrator.py <文件路径>"}))
sys.exit(1)
result = orchestrate(sys.argv[1])
print(json.dumps(result, ensure_ascii=False, indent=2))
【1/3】正在清洗数据...
已清洗:删除重复行 5 条,填充空值 18 处
【2/3】正在生成统计报告...
【3/3】正在生成 Excel 报告...
{
"status": "success",
"output": "/mnt/user-data/outputs/runoob_sales_report.xlsx",
"clean_stats": {"dup_removed": 5, "nulls_filled": 18}
}
编排 Skill 的 SKILL.md
--- name: report-pipeline version: 1.0.0 description: > 完整的数据报告生成流水线:自动对上传的 CSV/Excel 文件执行数据清洗、 统计分析并生成 Excel 报告。当用户需要一键生成数据报告、 从原始数据直接输出分析报告时触发。 --- # 数据报告流水线 ## 前置条件 需要以下 Skills 已安装: - data-cleaner v1.0+ - 本 Skill 的脚本位于 /mnt/skills/public/report-pipeline/ ## 执行 获取用户上传的文件路径后,直接运行编排脚本: ```bash python scripts/orchestrator.py <文件路径> ``` 脚本会实时输出每阶段的进度,完成后输出最终文件路径。 解析 JSON 输出后调用 present_files 展示报告文件。 ## 失败处理 若任意阶段失败,输出包含 stage 和 message 的错误信息, 告知用户是哪个阶段失败以及可能的原因,不要继续执行后续阶段。
