plugins/languages/python/skills/async/SKILL.md
Python 异步编程与并发规范 (asyncio 3.13+)。涵盖 async/await、TaskGroup 结构化并发、httpx 异步 HTTP、aiofiles、超时与取消、free-threading (PEP 703)。在写异步函数、并发调度、I/O 优化、迁移同步代码、调试 async bug 时使用。也触发于"asyncio"、"async/await"、"并发"、"httpx"、"TaskGroup"。
npx skillsauth add lazygophers/ccplugin python-asyncInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Python 3.13+, asyncio 优先。3.14 的 free-threading (PEP 703) 解决 CPU 密集, asyncio 仍是 I/O 密集首选。
| 场景 | 选择 |
|------|------|
| I/O 密集 (HTTP, DB, 文件) | async/await |
| CPU 密集 (计算、加密、图像) | asyncio.to_thread 或 free-threading (3.14t) 多线程 |
| 真并行 CPU | 3.13 用 multiprocessing, 3.14 用 free-threading 多线程 |
| 单次脚本, 一两个 I/O | 同步代码 + httpx.Client 即可, 别强上 async |
整个调用链要么全 async 要么全 sync。混合调用 (asyncio.run 嵌套, loop.run_until_complete 在异步上下文里) 会死锁。
import asyncio
import httpx
async def fetch_user(client: httpx.AsyncClient, uid: int) -> dict:
resp = await client.get(f"/users/{uid}")
resp.raise_for_status()
return resp.json()
async def main() -> None:
async with httpx.AsyncClient(base_url="https://api.example.com") as client:
user = await fetch_user(client, 1)
print(user)
asyncio.run(main()) # 进程入口唯一一次
并发首选 TaskGroup, 不用 asyncio.gather (除非确实需要 return_exceptions=True):
async def fetch_all(uids: list[int]) -> list[dict]:
async with httpx.AsyncClient() as client:
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(fetch_user(client, uid)) for uid in uids]
return [t.result() for t in tasks]
TaskGroup 优势:
ExceptionGroup, 用 except* 处理 (见 python-error)async with 前等所有任务完成, 防止任务泄漏gather 仅在需要"全部跑完, 错误单独收集"时用:
results = await asyncio.gather(*tasks, return_exceptions=True)
asyncio.timeout (3.11+) 替代 wait_for:
async with asyncio.timeout(5.0):
data = await slow_operation()
# TimeoutError 在退出时抛出
不要用 wait_for (老 API, 取消语义有坑)。
CancelledError 是控制流而非异常, 不要吞掉:
async def worker():
try:
await do_work()
except asyncio.CancelledError:
await cleanup()
raise # 必须 re-raise
except Exception:
log.error("worker_failed", exc_info=True)
raise
清理用 try/finally 或 async with, 不要依赖捕获 CancelledError。
不要再用 requests / urllib:
# 复用 client (连接池, 性能 10x+)
async with httpx.AsyncClient(
base_url="https://api.example.com",
timeout=httpx.Timeout(10.0, connect=5.0),
limits=httpx.Limits(max_connections=100),
) as client:
resp = await client.get("/users", params={"page": 1})
每次请求都 httpx.AsyncClient() 是反模式 (无连接复用)。把 client 放在 app 生命周期 (FastAPI lifespan) 或 fixture 里。
异步上下文里读写文件:
import aiofiles
async with aiofiles.open("data.json") as f:
content = await f.read()
但小文件直接 Path.read_text() (同步, 几 ms) 比开 aiofiles 还快, 别过度异步化。
CPU 密集任务不要写成 async def, 会阻塞事件循环:
# 同步 CPU 函数
def compute_hash(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
# 在 async 上下文里调
result = await asyncio.to_thread(compute_hash, big_data)
3.14 free-threading (python3.14t) 让多线程真并行, 多核 CPU 任务可用 concurrent.futures.ThreadPoolExecutor, 但生态 (C 扩展兼容) 还在补齐, 生产环境先评估。
sem = asyncio.Semaphore(10) # 同时最多 10 并发
async def bounded_fetch(client, url):
async with sem:
return await client.get(url)
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(bounded_fetch(client, url))
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy import select
engine = create_async_engine("postgresql+asyncpg://...")
async def get_user(session: AsyncSession, uid: int) -> User | None:
stmt = select(User).where(User.id == uid)
result = await session.execute(stmt)
return result.scalar_one_or_none()
不要在 async 代码里用同步 psycopg2 / sqlite3。
asyncio.run(main(), debug=True) 启用 debug 模式, 检测慢回调和未 await 协程PYTHONASYNCIODEBUG=1 环境变量同效果loop.set_slow_callback_duration(0.05) 报警长回调requests / urllib3 / aiohttp (新代码统一 httpx)asyncio.get_event_loop() (用 asyncio.get_running_loop() 或不用)time.sleep() (用 await asyncio.sleep())asyncio.run(...) 多次嵌套 (整个程序只一次)RuntimeWarning: coroutine 'x' was never awaited)to_threadfor x in tasks: await x (串行, 失去并发, 用 TaskGroup)CancelledErrortools
--- name: trellisx-workspace description: 维护 `.trellis/task.md` 任务看板 —— trellis 缺的跨任务总览。**一个表格, 一行一个任务**, 列为 id/名称/描述/状态/阶段/进度/worktree (状态/阶段中文显示)。在 task create/start/阶段切换/archive 后**及时更新**对应行; 并**自动清理超 7 天的已完成行**防膨胀。保持看板与 task.json 实时一致。 when_to_use: 维护 / 创建 / 更新 `.trellis/task.md` 任务看板时; task 生命周期任一节点 (create/start/阶段推进/archive) 之后同步看板时; 用户问"当前有哪些任务 / 任务进度 / 任务看板"时。被 trellisx-flow 与 trellisx-apply 注入的流程引用。 user-invocable: true argument-hint: [show|update|sync|cleanup ...] [task id] arguments:
testing
强制以 Trellis task 闭环处理用户指定的请求 (自判新建/并入 → plan→exec→check→finish 全程不跳步)。**仅用户显式主动调用** (/trellisx-flow 或明确要求"强制走 task 处理这个"); **禁止自动 / 被动 / 推断式调用** —— 不要因为某个请求"看起来该建 task"就自动触发本 skill, 那是 apply 注入的 no_task 倾向的职责。
testing
把 强推task + subtask拆分 + worktree隔离 + 闭环收尾 四维度增量注入当前项目 .trellis/ (workflow.md 的 no_task/planning/in_progress 块 + spec 背书文档 + trellis 生命周期 hook worktree 自动化)。强推 task 与闭环为纯 prompt 软约束 (非平台 hook 硬拦截)。**纯增量追加, 绝不替换 trellis 原生文本** (no_task 分类+征同意/check/finish/前缀全保留)。幂等 (marker 包裹)。
development
Claude Code 会话历史整理 — 扫 ~/.claude/projects/**/*.jsonl 全部 session transcripts, 提取学习增量 (用户校正/决策/踩坑/L0 规则) → 全局记忆库 ~/.cortex/.wiki/memory/. 默认 --apply 落盘 (--dry-run opt-in 仅出 JSON plan 预览). 与 cortex-extract (L4-inbox 内部) 互补.