现在位置: 首页 > Skills 教程 > 正文

Skills 异步与并发处理

当 Skill 需要同时等待多个耗时操作(如网络请求、文件 I/O)时,异步编程能大幅减少总等待时间。

本篇介绍 Python asyncio 在 Skill 脚本中的应用场景与写法。


同步 vs 异步:何时该用异步

场景推荐方式原因
单个文件处理、单次 API 调用同步异步收益不明显,代码更简单
同时发出多个 HTTP 请求异步并发等待,总时间 ≈ 最长单次时间
同时读取多个文件异步或线程池I/O 等待时可切换任务
CPU 密集型计算(如图像处理)多进程asyncio 不绕过 GIL

异步不等于并行。Python 的 asyncio 是单线程的事件循环,适合 I/O 密集型任务(等待网络、磁盘)。CPU 密集型任务应使用 ProcessPoolExecutor


异步发送多个 HTTP 请求

假设 Skill 需要同时调用 3 个 API 接口,同步方式总耗时是三次请求之和,异步方式总耗时约等于最慢的那一次。

实例

# 文件路径:scripts/async_requests.py
# 异步并发调用多个 API,汇总结果

import asyncio
import aiohttp      # pip install aiohttp --break-system-packages
import json
import sys
import time

API_KEY  = "your_api_key"
BASE_URL = "https://api.example.com/v1"

async def fetch_one(session: aiohttp.ClientSession,
                    endpoint: str, payload: dict) -> dict:
    """异步发出单个 POST 请求"""
    url     = BASE_URL + endpoint
    headers = {"Authorization": f"Bearer {API_KEY}"}
    try:
        async with session.post(url, json=payload,
                                headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
            resp.raise_for_status()
            return {"endpoint": endpoint, "status": "ok", "data": await resp.json()}
    except Exception as e:
        return {"endpoint": endpoint, "status": "error", "message": str(e)}

async def fetch_all(requests: list) -> list:
    """
    并发发送所有请求,等待全部完成

    参数:
        requests: 列表,每项为 (endpoint, payload) 元组
    """

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_one(session, endpoint, payload)
            for endpoint, payload in requests
        ]
        # gather 并发执行所有 task,return_exceptions=True 防止一个失败影响其他
        return await asyncio.gather(*tasks, return_exceptions=True)

def main():
    # 定义需要并发调用的请求列表
    requests = [
        ("/summarize", {"text": "runoob 是技术学习平台", "lang": "zh"}),
        ("/keywords",  {"text": "RUNOOB 提供各类编程教程", "top_n": 5}),
        ("/sentiment", {"text": "这个平台的教程质量很高"}),
    ]

    t0      = time.perf_counter()
    results = asyncio.run(fetch_all(requests))
    elapsed = time.perf_counter() - t0

    print(f"并发完成 {len(results)} 个请求,总耗时 {elapsed:.2f} 秒")
    print(json.dumps(results, ensure_ascii=False, indent=2))

if __name__ == "__main__":
    main()
并发完成 3 个请求,总耗时 1.23 秒
(同步串行约需 3.5 秒,并发节省约 65% 时间)

异步读取多个文件

对于需要同时读取多个文件的场景,aiofiles 提供了文件 I/O 的异步版本。

实例

# 文件路径:scripts/async_file_reader.py
import asyncio
import aiofiles   # pip install aiofiles --break-system-packages
import os

async def read_file(file_path: str) -> dict:
    """异步读取单个文件"""
    try:
        async with aiofiles.open(file_path, encoding="utf-8") as f:
            content = await f.read()
        return {
            "file":    os.path.basename(file_path),
            "chars":   len(content),
            "lines":   content.count("\n"),
            "status":  "ok"
        }
    except Exception as e:
        return {"file": file_path, "status": "error", "message": str(e)}

async def read_all_files(file_paths: list) -> list:
    """并发读取所有文件"""
    tasks = [read_file(fp) for fp in file_paths]
    return await asyncio.gather(*tasks)

if __name__ == "__main__":
    import glob, json
    # 读取上传目录中的所有 .txt 文件
    files = glob.glob("/mnt/user-data/uploads/*.txt")
    if not files:
        print("未找到 .txt 文件")
    else:
        results = asyncio.run(read_all_files(files))
        print(json.dumps(results, ensure_ascii=False, indent=2))

超时控制与任务取消

异步任务中,超时控制尤为重要。asyncio.wait_for 可以为单个任务设置最大等待时间。

实例

# 文件路径:scripts/async_timeout.py
import asyncio

async def slow_task(name: str, delay: float) -> str:
    """模拟一个耗时任务"""
    await asyncio.sleep(delay)
    return f"{name} 完成"

async def run_with_timeout():
    tasks = {
        "快速任务": slow_task("快速任务", 0.5),
        "慢速任务": slow_task("慢速任务", 5.0),
    }
    results = {}
    for name, coro in tasks.items():
        try:
            # 每个任务最多等待 2 秒
            result = await asyncio.wait_for(coro, timeout=2.0)
            results[name] = {"status": "ok", "result": result}
        except asyncio.TimeoutError:
            results[name] = {"status": "timeout", "message": "超过 2 秒未完成"}
    return results

if __name__ == "__main__":
    import json
    output = asyncio.run(run_with_timeout())
    print(json.dumps(output, ensure_ascii=False, indent=2))
{
  "快速任务": {"status": "ok",      "result": "快速任务 完成"},
  "慢速任务": {"status": "timeout", "message": "超过 2 秒未完成"}
}

在 SKILL.md 中说明异步脚本的调用方式

## 并发 API 调用

当用户提供多个文本需要同时分析时,运行异步脚本以提升速度:

```bash
python scripts/async_requests.py
```

该脚本会并发发出所有请求,总耗时约等于最慢的单次请求,
而非所有请求的时间之和。