skills/inngest-realtime/SKILL.md
Use when streaming durable workflow updates to a UI in real time — live order status pages that animate as steps complete, AI agent token streaming from a function to the browser, log tailing for long-running jobs, or human-in-the-loop approval flows that publish a prompt and wait for a user reply. Covers Inngest v4 native realtime: defining typed channels, publishing from inside step.run, minting subscription tokens via server actions, and consuming the stream from React/Next.js client components.
npx skillsauth add inngest/inngest-skills inngest-realtimeInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Stream updates from durable Inngest functions to live UIs. Use channels and topics to broadcast progress, render workflow execution as it happens, or build bi-directional human-in-the-loop flows.
These skills are focused on TypeScript. For Python or Go, refer to the Inngest documentation for language-specific guidance. Core concepts apply across all languages.
⚠ CRITICAL: v3 vs v4 package selection
Realtime in Inngest v4 lives at the SDK subpath
inngest/realtime. The standalone@inngest/realtimenpm package is a v3-era package and is NOT compatible with[email protected]. If your project is on v4 (the npm default), do not install@inngest/realtime. Use the imports below.Symptoms of using the wrong package on v4:
TypeError: Cls is not a constructoron everyPUT /api/inngest, 401 on subscription tokens, type incompatibility onnew Inngest({ middleware: [...] }). Verify yourpackage.jsonshows"inngest": "^4.x"before reading further.
npm install inngest) — see the inngest-setup skillINNGEST_DEV=1 set in .env.local for local development (without it, the SDK demands cloud signing keys and 401s on token requests)npx inngest-cli@latest dev)zod for schema validation on topics| Problem shape | Pattern | |---|---| | Order status page animates as durable workflow steps complete | Per-run channel, publish per step, client subscribes | | AI agent streams tokens to a chat UI | Per-conversation channel, publish chunks, stream to browser | | Log tail for a long-running job | Single channel, log topic, append to UI | | Human-in-the-loop approval | Channel + waitForEvent, publish prompt, wait for response | | Admin dashboard with live order list | Global admin channel, fan-out from each function |
Three pieces:
step.realtime.publish between steps to wrap a durable publish, or inngest.realtime.publish inside step.run because you're already inside a memoized step. See "Which publish method to use" below.useRealtime hook (or the lower-level subscribe() API for non-React consumers).Channels are pure data — no class hierarchy, no zod runtime required (but recommended for type safety). Define them once and import where needed.
// src/inngest/channels.ts
import { channel } from 'inngest/realtime';
import { z } from 'zod';
// Per-run channel: each fulfill-order run publishes step updates to its own channel.
export const orderChannel = channel({
name: (orderId: string) => `order:${orderId}`,
topics: {
step: {
schema: z.object({
name: z.string(),
status: z.enum(['running', 'complete', 'failed']),
output: z.record(z.string(), z.unknown()).optional(),
ts: z.number(),
}),
},
},
});
// Global admin channel: fan-out for cross-cutting visibility.
export const adminChannel = channel({
name: 'admin',
topics: {
order: {
schema: z.object({
orderId: z.string(),
step: z.string(),
status: z.enum(['running', 'complete', 'failed']),
ts: z.number(),
}),
},
},
});
Two channel name shapes:
name: 'admin' — static channel, accessed as adminChannel.order (topic ref)name: (id) => 'channel:${id}' — parametric, accessed as orderChannel(id).step (call the channel def with the id, then access topic)Inngest v4 ships realtime support natively — no middleware required. But where you call publish matters: it determines whether the publish is durable, and it's the most common place to get realtime wrong.
| Where you are | Use this | Why |
|---|---|---|
| Outside a step (top-level handler code, between step.run calls) | step.realtime.publish(id, topicRef, data) | Wraps the publish in its own step so it's durable, deduplicated by id, and retry-safe. |
| Inside a step (inside the callback passed to step.run) | inngest.realtime.publish(topicRef, data) | You're already inside a memoized step. step.realtime.publish would create a step inside a step. The bare client publish is the right call here. |
| Outside a function (one-off route, script, etc.) | inngest.realtime.publish(topicRef, data) | Allowed, but not retry-safe — your client receiver must handle duplicates. |
The 90% rule: if you're writing handler code and you reach for publish, use step.realtime.publish. If you're writing code inside a step.run block and you reach for publish, use inngest.realtime.publish.
// src/inngest/functions/fulfill-order.ts
import { inngest } from '../client';
import { orderChannel, adminChannel } from '../channels';
export const fulfillOrder = inngest.createFunction(
{
id: 'fulfill-order',
retries: 3,
triggers: [{ event: 'store/order.placed' }],
},
async ({ event, step }) => {
const { orderId, customerEmail, lineItems } = event.data;
// Outside any step.run — use step.realtime.publish for a durable wrapper.
const emit = async (
name: string,
status: 'running' | 'complete' | 'failed',
output?: Record<string, unknown>,
) => {
const ts = Date.now();
await step.realtime.publish(
`emit-order-${name}-${status}`,
orderChannel(orderId).step,
{ name, status, output, ts },
);
await step.realtime.publish(
`emit-admin-${name}-${status}`,
adminChannel.order,
{ orderId, step: name, status, ts },
);
};
await emit('capture-payment', 'running');
// Inside step.run — use inngest.realtime.publish (already in a memoized step).
const payment = await step.run('capture-payment', async () => {
const intent = await stripe.paymentIntents.create({ /* ... */ });
// Stream a partial update mid-step. No step-in-step wrapping needed.
await inngest.realtime.publish(orderChannel(orderId).step, {
name: 'capture-payment',
status: 'running',
output: { stage: 'intent-created', intentId: intent.id },
ts: Date.now(),
});
return await stripe.paymentIntents.confirm(intent.id);
});
await emit('capture-payment', 'complete', payment);
await emit('reserve-inventory', 'running');
const inventory = await step.run('reserve-inventory', async () => {
// ...
});
await emit('reserve-inventory', 'complete', inventory);
// ...
},
);
Why no middleware: Earlier versions used @inngest/realtime's realtimeMiddleware() to inject a publish arg into the handler. v4 puts it on step.realtime and inngest.realtime directly.
In Next.js App Router, use a Server Action to securely mint a short-lived token for the React hook in Step 4. Without a token, clients can't subscribe.
// src/app/orders/[orderId]/actions.ts
'use server';
import { getClientSubscriptionToken } from 'inngest/react';
import { inngest } from '@/inngest/client';
import { orderChannel } from '@/inngest/channels';
export async function fetchOrderSubscriptionToken(orderId: string) {
// ⚠ AUTHORIZATION GATE: verify the current user owns this orderId
// before minting a token. Channels are addressable by ID, so without
// an ownership check, anyone can subscribe to any order's stream by
// guessing IDs.
//
// const session = await getServerSession();
// if (!session) throw new Error('Unauthenticated');
// const order = await db.order.findUnique({ where: { id: orderId } });
// if (order?.userId !== session.userId) throw new Error('Forbidden');
return getClientSubscriptionToken(inngest, {
channel: orderChannel(orderId),
topics: ['step'],
});
}
getClientSubscriptionToken from inngest/react returns a token shape that the useRealtime hook in Step 4 consumes directly. No ChannelInstance stripping needed — that gotcha only applies to the lower-level getSubscriptionToken + manual subscribe() path (see "Pattern: Manual subscribe" below).
useRealtime hookThe recommended consumer for React/Next.js is the useRealtime hook from inngest/react. It handles the subscription lifecycle, reconnect, type narrowing per topic, and cleanup.
// src/components/OrderStatusClient.tsx
'use client';
import { useRealtime } from 'inngest/react';
import { orderChannel } from '@/inngest/channels';
import { fetchOrderSubscriptionToken } from '@/app/orders/[orderId]/actions';
export function OrderStatusClient({ orderId }: { orderId: string }) {
const { messages, connectionStatus, error } = useRealtime({
channel: orderChannel(orderId),
topics: ['step'] as const,
token: () => fetchOrderSubscriptionToken(orderId),
});
if (error) return <div>Error: {error.message}</div>;
return (
<div>
<div>Status: {connectionStatus}</div>
<ul>
{messages.all.map((m, i) => (
<li key={i}>
{(m.data as { name: string }).name}: {(m.data as { status: string }).status}
</li>
))}
</ul>
</div>
);
}
Useful options on the hook:
| Option | Default | Use it when |
|---|---|---|
| enabled | true | Delay the subscription until you have an ID (e.g., enabled: !!runId). |
| bufferInterval | 0 | Batch updates from a fast stream so React doesn't re-render per message. |
| pauseOnHidden | false | Pause the stream when the tab isn't visible (saves bandwidth). |
| autoCloseOnTerminal | true | Disconnect when the run completes — turn off to keep the stream open for fan-out channels. |
| historyLimit | unbounded | Cap how many messages are retained in messages.all. |
The hook returns messages.byTopic (latest per topic), messages.all (full history), messages.last (most recent), and messages.delta (new since last render).
The useRealtime hook covers the React case. If you're not using React, or you need a custom subscription lifecycle (server-side streaming, background workers, custom protocols), use the lower-level subscribe() API directly.
// src/app/orders/[orderId]/actions.ts
'use server';
import { getSubscriptionToken } from 'inngest/realtime';
import { inngest } from '@/inngest/client';
import { orderChannel } from '@/inngest/channels';
export async function fetchOrderSubscriptionTokenLowLevel(orderId: string) {
// ⚠ AUTHORIZATION GATE: same as Step 3 — verify ownership before minting.
const token = await getSubscriptionToken(inngest, {
channel: orderChannel(orderId),
topics: ['step'],
});
// ⚠ CRITICAL: strip the ChannelInstance from the response.
// getSubscriptionToken returns { channel: ChannelInstance, ... } where
// ChannelInstance contains zod schema methods (a class with prototypes).
// Next.js refuses to serialize classes across the server-action → client-component
// boundary, so return ONLY primitives.
return {
channel: orderChannel(orderId).name as string,
topics: ['step'] as const,
key: token.key,
apiBaseUrl: token.apiBaseUrl,
};
}
// src/components/OrderStatusManual.tsx
'use client';
import * as React from 'react';
import { subscribe } from 'inngest/realtime';
import { fetchOrderSubscriptionTokenLowLevel } from '@/app/orders/[orderId]/actions';
export function OrderStatusManual({ orderId }: { orderId: string }) {
const [messages, setMessages] = React.useState<unknown[]>([]);
React.useEffect(() => {
let cancelled = false;
let sub: { close?: (reason?: string) => void } | undefined;
(async () => {
const token = await fetchOrderSubscriptionTokenLowLevel(orderId);
if (cancelled) return;
sub = await subscribe(
{
channel: token.channel,
topics: [...token.topics],
key: token.key,
apiBaseUrl: token.apiBaseUrl,
},
(message) => {
if (cancelled) return;
setMessages((prev) => [...prev, message.data]);
},
);
})();
return () => {
cancelled = true;
sub?.close?.('unmount');
};
}, [orderId]);
// ... render ...
}
Subscribe inside a Next.js API route and pipe the stream to the client via SSE:
// src/app/api/orders/[orderId]/stream/route.ts
import { inngest } from '@/inngest/client';
import { subscribe } from 'inngest/realtime';
import { orderChannel } from '@/inngest/channels';
export async function GET(req: Request, { params }: { params: { orderId: string } }) {
// ⚠ AUTHORIZATION GATE: same rule as the server-action token mint above.
// Authenticate the request and confirm the caller owns params.orderId
// before opening the SSE stream. Skipping this leaks every order's
// step events to anyone with a URL.
const stream = await subscribe({
app: inngest,
channel: orderChannel(params.orderId),
topics: ['step'],
});
return new Response(stream.getEncodedStream(), {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}
Client consumes via fetch().getReader() rather than the subscribe() callback. Use this when you want the SSE behavior or when the client-side subscribe() API doesn't fit your component lifecycle.
Combine step.realtime.publish with step.waitForEvent:
import crypto from 'crypto';
export const reviewWorkflow = inngest.createFunction(
{ id: 'review-workflow', triggers: [{ event: 'review/start' }] },
async ({ event, step }) => {
const confirmationId = await step.run('gen-id', () => crypto.randomUUID());
// Publish a prompt — the client subscribes and renders an approval UI
await step.realtime.publish(
'publish-prompt',
reviewChannel.message,
{ message: 'Confirm to proceed?', confirmationId },
);
// Wait up to 15 minutes for the user to send the matching event back
const confirmation = await step.waitForEvent('await-confirmation', {
event: 'review/confirmation',
timeout: '15m',
if: `async.data.confirmationId == "${confirmationId}"`,
});
if (!confirmation) {
// user didn't respond — abort or escalate
return { decision: 'timed_out' };
}
// continue workflow...
},
);
The confirmationId links the published prompt to the matching reply, so the workflow knows which response to act on.
@inngest/realtime on v4The standalone @inngest/realtime package is for Inngest v3 only. On v4, all realtime APIs are in the SDK subpath inngest/realtime. Mixing them produces:
TypeError: Cls is not a constructor on PUT /api/inngest (v3 middleware class signature mismatch)Verify with: grep '"inngest"' package.json — if it's ^4.x, use inngest/realtime. Period.
getSubscriptionToken returns { channel: ChannelInstance, ... } where ChannelInstance has zod schema methods (a class). Next.js refuses to serialize classes across the server-action → client-component boundary. Strip to primitives before returning. See "Pattern: Manual subscribe" above.
This gotcha does not apply when you use getClientSubscriptionToken from inngest/react (Step 3 — the recommended path). That helper returns a serialization-safe shape directly.
INNGEST_DEV=1 is required for local devWithout it, the SDK assumes cloud mode and demands INNGEST_SIGNING_KEY + INNGEST_EVENT_KEY. All realtime operations 401 / 500. Add to .env.local. Hard restart the dev server (Next.js does not hot-reload .env.local changes).
If your published payload doesn't match the zod schema, the publish fails server-side. Subscriber receives nothing. Catch publish errors during step execution, or run with validate: false in subscribe() if you have a reason to skip schema validation client-side.
import { channel } from 'inngest/realtime' — channel definitionsimport { useRealtime, getClientSubscriptionToken } from 'inngest/react' — React hook + matching token helper (Step 3 + Step 4)import { getSubscriptionToken, subscribe } from 'inngest/realtime' — lower-level helpers for non-React or custom transportstep.realtime.publish(id, topicRef, data) — wraps in a durable stepstep.run: inngest.realtime.publish(topicRef, data) — already inside a memoized step, no wrapping neededinngest.realtime.publish(topicRef, data) — allowed but not retry-safesubscribe(token) returns a stream; subscribe(token, callback) invokes callback per messageChannelInstance → return { channel: string, topics, key, apiBaseUrl }. Not needed with getClientSubscriptionToken.tools
Use when upgrading an existing TypeScript codebase from Inngest SDK v3 to v4, or when fixing mixed v3/v4 API usage. Covers detecting current SDK usage, moving triggers into createFunction options, replacing EventSchemas with eventType/staticSchema, moving serve options to the client, updating realtime imports, rewriting step.invoke string IDs, checkpointing/serverless runtime settings, Connect option changes, and verification.
tools
Use when installing or running the Inngest CLI and Dev Server for local development, local testing, serve endpoint debugging, Docker or Docker Compose setup, MCP configuration, self-hosted `inngest start`, or deployment workflow checks. Covers `inngest dev`, `inngest start`, auto-discovery, config files, environment variables, `@inngest/test`, local event sending, platform gotchas, and production/self-hosted server flags.
development
Use when analyzing an existing TypeScript or JavaScript codebase to decide where and how to introduce Inngest. Covers repository discovery, framework and package detection, finding durability gaps in HTTP handlers, webhooks, cron jobs, queues, long-running jobs, AI agents, polling loops, and side-effect-heavy code, then producing and implementing an incremental integration plan.
tools
Use when the user explicitly asks for the Inngest REST API v2, raw HTTP, OpenAPI, API docs, API authentication, or an endpoint that the Inngest CLI does not expose. Covers api-docs.inngest.com, llms.txt, the OpenAPI v2 spec, Bearer authentication with API keys or signing keys, production and local base URLs, raw curl/fetch requests, request-shape discovery, pagination, secret redaction, and when to prefer the `inngest-api-cli` skill instead.