Skills 异步与并发处理
当 Skill 需要同时等待多个耗时操作(如网络请求、文件 I/O)时,异步编程能大幅减少总等待时间。
本篇介绍 Python asyncio 在 Skill 脚本中的应用场景与写法。
同步 vs 异步:何时该用异步
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 单个文件处理、单次 API 调用 | 同步 | 异步收益不明显,代码更简单 |
| 同时发出多个 HTTP 请求 | 异步 | 并发等待,总时间 ≈ 最长单次时间 |
| 同时读取多个文件 | 异步或线程池 | I/O 等待时可切换任务 |
| CPU 密集型计算(如图像处理) | 多进程 | asyncio 不绕过 GIL |
异步不等于并行。Python 的 asyncio 是单线程的事件循环,适合 I/O 密集型任务(等待网络、磁盘)。CPU 密集型任务应使用
ProcessPoolExecutor。
异步发送多个 HTTP 请求
假设 Skill 需要同时调用 3 个 API 接口,同步方式总耗时是三次请求之和,异步方式总耗时约等于最慢的那一次。
实例
# 文件路径:scripts/async_requests.py
# 异步并发调用多个 API,汇总结果
import asyncio
import aiohttp # pip install aiohttp --break-system-packages
import json
import sys
import time
API_KEY = "your_api_key"
BASE_URL = "https://api.example.com/v1"
async def fetch_one(session: aiohttp.ClientSession,
endpoint: str, payload: dict) -> dict:
"""异步发出单个 POST 请求"""
url = BASE_URL + endpoint
headers = {"Authorization": f"Bearer {API_KEY}"}
try:
async with session.post(url, json=payload,
headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
return {"endpoint": endpoint, "status": "ok", "data": await resp.json()}
except Exception as e:
return {"endpoint": endpoint, "status": "error", "message": str(e)}
async def fetch_all(requests: list) -> list:
"""
并发发送所有请求,等待全部完成
参数:
requests: 列表,每项为 (endpoint, payload) 元组
"""
async with aiohttp.ClientSession() as session:
tasks = [
fetch_one(session, endpoint, payload)
for endpoint, payload in requests
]
# gather 并发执行所有 task,return_exceptions=True 防止一个失败影响其他
return await asyncio.gather(*tasks, return_exceptions=True)
def main():
# 定义需要并发调用的请求列表
requests = [
("/summarize", {"text": "runoob 是技术学习平台", "lang": "zh"}),
("/keywords", {"text": "RUNOOB 提供各类编程教程", "top_n": 5}),
("/sentiment", {"text": "这个平台的教程质量很高"}),
]
t0 = time.perf_counter()
results = asyncio.run(fetch_all(requests))
elapsed = time.perf_counter() - t0
print(f"并发完成 {len(results)} 个请求,总耗时 {elapsed:.2f} 秒")
print(json.dumps(results, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
# 异步并发调用多个 API,汇总结果
import asyncio
import aiohttp # pip install aiohttp --break-system-packages
import json
import sys
import time
API_KEY = "your_api_key"
BASE_URL = "https://api.example.com/v1"
async def fetch_one(session: aiohttp.ClientSession,
endpoint: str, payload: dict) -> dict:
"""异步发出单个 POST 请求"""
url = BASE_URL + endpoint
headers = {"Authorization": f"Bearer {API_KEY}"}
try:
async with session.post(url, json=payload,
headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
return {"endpoint": endpoint, "status": "ok", "data": await resp.json()}
except Exception as e:
return {"endpoint": endpoint, "status": "error", "message": str(e)}
async def fetch_all(requests: list) -> list:
"""
并发发送所有请求,等待全部完成
参数:
requests: 列表,每项为 (endpoint, payload) 元组
"""
async with aiohttp.ClientSession() as session:
tasks = [
fetch_one(session, endpoint, payload)
for endpoint, payload in requests
]
# gather 并发执行所有 task,return_exceptions=True 防止一个失败影响其他
return await asyncio.gather(*tasks, return_exceptions=True)
def main():
# 定义需要并发调用的请求列表
requests = [
("/summarize", {"text": "runoob 是技术学习平台", "lang": "zh"}),
("/keywords", {"text": "RUNOOB 提供各类编程教程", "top_n": 5}),
("/sentiment", {"text": "这个平台的教程质量很高"}),
]
t0 = time.perf_counter()
results = asyncio.run(fetch_all(requests))
elapsed = time.perf_counter() - t0
print(f"并发完成 {len(results)} 个请求,总耗时 {elapsed:.2f} 秒")
print(json.dumps(results, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()
并发完成 3 个请求,总耗时 1.23 秒 (同步串行约需 3.5 秒,并发节省约 65% 时间)
异步读取多个文件
对于需要同时读取多个文件的场景,aiofiles 提供了文件 I/O 的异步版本。
实例
# 文件路径:scripts/async_file_reader.py
import asyncio
import aiofiles # pip install aiofiles --break-system-packages
import os
async def read_file(file_path: str) -> dict:
"""异步读取单个文件"""
try:
async with aiofiles.open(file_path, encoding="utf-8") as f:
content = await f.read()
return {
"file": os.path.basename(file_path),
"chars": len(content),
"lines": content.count("\n"),
"status": "ok"
}
except Exception as e:
return {"file": file_path, "status": "error", "message": str(e)}
async def read_all_files(file_paths: list) -> list:
"""并发读取所有文件"""
tasks = [read_file(fp) for fp in file_paths]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import glob, json
# 读取上传目录中的所有 .txt 文件
files = glob.glob("/mnt/user-data/uploads/*.txt")
if not files:
print("未找到 .txt 文件")
else:
results = asyncio.run(read_all_files(files))
print(json.dumps(results, ensure_ascii=False, indent=2))
import asyncio
import aiofiles # pip install aiofiles --break-system-packages
import os
async def read_file(file_path: str) -> dict:
"""异步读取单个文件"""
try:
async with aiofiles.open(file_path, encoding="utf-8") as f:
content = await f.read()
return {
"file": os.path.basename(file_path),
"chars": len(content),
"lines": content.count("\n"),
"status": "ok"
}
except Exception as e:
return {"file": file_path, "status": "error", "message": str(e)}
async def read_all_files(file_paths: list) -> list:
"""并发读取所有文件"""
tasks = [read_file(fp) for fp in file_paths]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
import glob, json
# 读取上传目录中的所有 .txt 文件
files = glob.glob("/mnt/user-data/uploads/*.txt")
if not files:
print("未找到 .txt 文件")
else:
results = asyncio.run(read_all_files(files))
print(json.dumps(results, ensure_ascii=False, indent=2))
超时控制与任务取消
异步任务中,超时控制尤为重要。asyncio.wait_for 可以为单个任务设置最大等待时间。
实例
# 文件路径:scripts/async_timeout.py
import asyncio
async def slow_task(name: str, delay: float) -> str:
"""模拟一个耗时任务"""
await asyncio.sleep(delay)
return f"{name} 完成"
async def run_with_timeout():
tasks = {
"快速任务": slow_task("快速任务", 0.5),
"慢速任务": slow_task("慢速任务", 5.0),
}
results = {}
for name, coro in tasks.items():
try:
# 每个任务最多等待 2 秒
result = await asyncio.wait_for(coro, timeout=2.0)
results[name] = {"status": "ok", "result": result}
except asyncio.TimeoutError:
results[name] = {"status": "timeout", "message": "超过 2 秒未完成"}
return results
if __name__ == "__main__":
import json
output = asyncio.run(run_with_timeout())
print(json.dumps(output, ensure_ascii=False, indent=2))
import asyncio
async def slow_task(name: str, delay: float) -> str:
"""模拟一个耗时任务"""
await asyncio.sleep(delay)
return f"{name} 完成"
async def run_with_timeout():
tasks = {
"快速任务": slow_task("快速任务", 0.5),
"慢速任务": slow_task("慢速任务", 5.0),
}
results = {}
for name, coro in tasks.items():
try:
# 每个任务最多等待 2 秒
result = await asyncio.wait_for(coro, timeout=2.0)
results[name] = {"status": "ok", "result": result}
except asyncio.TimeoutError:
results[name] = {"status": "timeout", "message": "超过 2 秒未完成"}
return results
if __name__ == "__main__":
import json
output = asyncio.run(run_with_timeout())
print(json.dumps(output, ensure_ascii=False, indent=2))
{
"快速任务": {"status": "ok", "result": "快速任务 完成"},
"慢速任务": {"status": "timeout", "message": "超过 2 秒未完成"}
}
在 SKILL.md 中说明异步脚本的调用方式
## 并发 API 调用 当用户提供多个文本需要同时分析时,运行异步脚本以提升速度: ```bash python scripts/async_requests.py ``` 该脚本会并发发出所有请求,总耗时约等于最慢的单次请求, 而非所有请求的时间之和。
