skills/consume-nats-message/SKILL.md
Drain pending NATS messages from a producer contract via NATS MCP tools (default batch / drain-style). Applies Tolerant Reader semantics and per-message ack/nak/term, returning aggregated stats. Reads project-level cache (.cortex/nats.yaml) to avoid re-prompting.
npx skillsauth add nesnilnehc/ai-cortex consume-nats-messageInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
让相互协作的项目能"正确消费消息"——按 producer 契约通过 NATS MCP 工具批量拉取待处理消息,逐条按 Tolerant Reader 处理并正确 ack。默认 drain-style:一次调用拉至队列空或达上限,单条失败不阻断整批,最终返回聚合结果。
首要目标:drain producer 契约对应 subject 的待处理消息,每条按 Tolerant Reader 解码 → 业务处理 → 正确 ack / nak / term,输出聚合统计。
成功标准:
.cortex/nats.yaml 已读取contract_version 兼容性检查;契约缺失时 Bootstrap 已生成 draft 草稿并经用户确认drained / cap_reached / idle_timeout 之一验收测试:consumer 处理一批含 1 条新增可选字段的消息时,不抛错并正确 ack;含 1 条业务不可恢复错误的消息时,term + DLQ 投递;含 1 条临时失败的消息时,nak 让 broker 重投。
本技能负责:
本技能不负责:
mcp__nats__jetstream_pull_consumer / mcp__nats__ack / mcp__nats__nak / mcp__nats__term / mcp__nats__publish 等工具;具体工具名以连接的 server 为准)"我正在使用 consume-nats-message 技能 drain 一批待处理消息。"
.cortex/nats.yaml(consumer repo 根目录)broker_url / service_source / durable_name_prefix / dlq_handler_dir / vendor_contracts_dir / consume_defaultsconsume_defaults(max_messages / batch_size / fetch_timeout / idle_threshold)按优先级:
<vendor_contracts_dir>/<producer>/<event>-contract.md(冻结快照,含 @version 锚定)@version)读取契约:
clarification.session.requested.v1)目的:契约缺失是协作初期的常态,本阶段以"只读探查 + 推断"方式生成草稿契约,让用户在零先决条件下也能合理启动。全程不 ack、不改 consumer offset、不修改 broker 状态。
前置约束:
status: draft 的契约草稿,用户审阅确认前不进入阶段 5 的正式 drain步骤:
列举 broker 拓扑(peek,不订阅)
producer 名匹配的 stream(如 producer=zentao → ZENTAO stream)排前选定目标 subject
<producer>.<event>.* 模式匹配Peek 样本消息(不创建 durable,不 ack)
bootstrap_sample_size 覆盖)推断 schema
.v1 / .v2 后缀)Nats-Msg-Id / Nats-Stream 等)不进入推断TBD生成草稿契约
<vendor_contracts_dir>/<producer>/<event>-contract.mdcontract_version: 0.1.0
status: draft
inferred_from: bootstrap-peek
inferred_at: <ISO 8601>
inferred_sample_size: <N>
inferred_sample_seq_range: <first>-<last>
authoritative: false
⚠️ 本契约由 consume-nats-message Bootstrap 模式从 N 条样本消息推断生成,非权威。请联系 producer owner 确认字段语义、QoS、DLQ 规则后将
status升为active、authoritative升为true。
.lock 写入 <contract-name>@0.1.0请求用户确认
TBD)Bootstrap 模式下的版本兼容:
contract_version: 0.1.0 视为 pre-stable,阶段 3 跳过 MAJOR/MINOR 兼容检查status: activecontract_version<vendor_contracts_dir>/<producer>/.lock 记录 <contract-name>@<version>).lock 后继续通过 MCP NATS 工具:
<durable_name_prefix>-<event-slug>explicit)processed = 0
last_non_empty_at = now
while True:
if processed >= max_messages:
exit_reason = "cap_reached"; break
batch = fetch(batch_size, fetch_timeout)
if batch is empty:
if now - last_non_empty_at >= idle_threshold:
exit_reason = "drained"; break
continue
last_non_empty_at = now
for msg in batch:
try:
process_one(msg) # 阶段 6
except Exception as e:
record_failure(msg, e) # 不中断整批
processed += 1
退出原因取值:drained / cap_reached / idle_timeout(连续空 fetch + 未达上限)。
每条消息:
Nats-Msg-Id,TTL ≥ broker duplicate_window);命中 → 直接 ack 跳过Nats-Msg-Id / X-Source / X-Type → 视为违规消息(业务不可恢复,走 term 路径);未知 headers 忽略Traceparent 继续 span(如适用)| 业务结果 | MCP 工具 | 副作用 |
|---|---|---|
| 成功 | mcp__nats__ack | broker 确认消费 |
| 可重试错误(网络瞬时 / 下游限流) | mcp__nats__nak | broker 按 backoff 重投;最终达 max_deliver 自动 term |
| 不可恢复错误(schema 违规 / 业务永久失败) | mcp__nats__term + mcp__nats__publish 投 DLQ | 不再重投;原消息复制到 <original>.dlq 含失败原因 header |
DLQ 消息 headers 附加:
X-DLQ-Original-Subject:原 subjectX-DLQ-Reason:失败摘要X-DLQ-Failed-At:ISO 8601 时间戳输出:
exit_reason: drained # drained / cap_reached / idle_timeout
duration: 12.3s
counts:
fetched: 87
acked: 80
naked: 4
termed: 3
dlq_sent: 3
duplicates_skipped: 1
failures:
- msg_id: 01HX...A1
subject: clarification.session.requested.v1
decision: term
reason: missing required payload field 'sessionId'
- msg_id: 01HX...A2
subject: ...
decision: nak
reason: downstream timeout (retry will be attempted)
backlog_hint: |
exit_reason=drained,无需续跑。
(若为 cap_reached,建议再次调用本 Skill 续 drain。)
| 情况 | 处理 |
|---|---|
| MCP NATS server 未连接 | 提示检查 MCP 配置 |
| .cortex/nats.yaml 缺失 | 引导补齐 |
| 契约缺失 / 未 vendor | 进入阶段 2.5 Bootstrap——peek 样本 + 推断 + 生成 draft 草稿,不直接终止 |
| Bootstrap 推断后用户选(b)只看不动 | 草稿落盘后终止,下次调用阶段 2 即可命中 |
| Durable consumer 不存在 | 报错指向 IaC owner(违反 spec §5.5.2) |
| MAJOR 版本漂移 | 终止并提示升级 vendor 快照 |
| 单条解码失败 | term + DLQ + 写入失败清单,不中断整批 |
| 单条业务可重试失败 | nak,broker 自然重投 |
| Fetch 超时(broker 端无可用消息) | 计入空 fetch 计数,触发 idle_threshold 退出 |
status: draft / authoritative: false —— 会被误当权威契约对外引用,污染 SSOTdevelopment
Generate an LLM agent test suite (golden cases, mock-LLM unit tests, evaluator harness) from an agent implementation and its agent-test contract. Use when an agent has no tests, or a contract exists but the test code is missing.
development
After code changes, auto-detect the project's build system and local deployment method for a given directory, then build the project and restart its locally-deployed environment (Docker Compose / systemd / process manager). Never assumes — asks only when detection is ambiguous. Caches detected commands per project in .cortex/redeploy-local.yaml; re-invocations on the same project skip re-scanning until signal files change, the cache expires (30 days), or the skill version bumps.
tools
Publish a NATS message conforming to a cross-team contract, using NATS MCP tools. Authors the contract on first use if missing. Reads project-level cache (.cortex/nats.yaml) to avoid re-prompting basics across sessions.
testing
Iteratively review changes, run automated tests, and apply targeted fixes until issues are resolved (or a stop condition is reached).