skills/walkeros-understanding-sources/SKILL.md
Use when working with walkerOS sources, understanding event capture, or learning about the push interface. Covers browser, dataLayer, and server source patterns.
npx skillsauth add elbwalker/walkeros walkeros-understanding-sourcesInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Sources capture events from the external world (browser DOM, dataLayer, HTTP requests, cloud functions) and feed them to the collector.
Core principle: Sources capture. They don't process or deliver—that's collector and destinations.
See packages/core/src/types/source.ts for canonical interface.
Sources use a context-based initialization pattern:
import type { Source } from '@walkeros/core';
export const sourceMySource: Source.Init<Types> = async (context) => {
const { config = {}, env, logger, id } = context;
// ...
};
Context contains:
| Property | Type | Purpose |
| ----------- | ------------------------------------ | -------------------------------------------------------- |
| config | Source.Config<T> | Settings, mapping, options |
| env | Types['env'] | Environment (push, logger) |
| logger | Logger | Logging functions |
| id | string | Source identifier |
| collector | Collector.Instance | Reference to collector |
| withScope | (raw, respond, body) => Promise<R> | Bind ingest + respond to a single scope (server sources) |
| Method | Purpose |
| ------------- | ----------------------------------- |
| push(input) | Receive external input, emit events |
init?: () => void | Promise<void> — Optional eager-startup hook on the
returned Source.Instance. The factory must be side-effect-free: build the
instance and return it. The collector calls init() on every source eagerly
after all factories register, regardless of config.require. Use init for
work that previously sat in the factory body: draining a pre-init window queue
(e.g., window.elbLayer), attaching DOM listeners, opening sockets,
intercepting window.dataLayer. After init runs the collector flips
Source.Config.init to true.
queueOn?: Array<{ type: On.Types; data: unknown }> — Optional buffer on the
Source.Instance for lifecycle events delivered before the source is
started (started ≡ config.init === true && !config.require?.length). The
collector pushes { type, data } here when it would otherwise call
source.on(type, data). Once the source becomes started, the collector replays
each entry via source.on(...) and clears the queue.
destroy?: DestroyFn — Optional cleanup method. Called during
command('shutdown'). Use to close HTTP servers, timers, or connections.
Receives { id, config, env, logger }.
The collector and every source agree on three lifecycle markers, all on
Source.Config:
| Field | Purpose |
| -------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| init?: boolean | Set by the collector to true after Instance.init() resolves. Authors do not write to it. Reflects "init has run", not "is started". |
| require?: string[] | Author-supplied timing hint. Lists collector state a source needs before its first on() delivery. Satisfied by the collector's current recorded state (level), not only by a future event — so order does not matter: the gate clears whether the required state was recorded before or after this source registered. It is not a correctness dependency: a source reacts to state correctly whether or not it declares require. |
| disabled?: boolean | Hard skip — no factory invocation, no init, no event capture. |
The flow is:
Source.Instance with no side effects.Instance.init() on every registered source,
then sets config.init = true.consent, user,
session, run, …) the collector decrements every source's require in
place AND reconciles every still-pending source/destination against current
state, so a gate also clears from state that was already recorded. If a
source is started, it calls source.on(type, data) directly. Otherwise it
pushes { type, data } into Instance.queueOn.queueOn via source.on(...) and clears the queue.require therefore gates the timing of on() delivery, not code execution
and not correctness. Instance.init() always runs eagerly. There is no
collector.pending.sources map: per-source state lives entirely on
Source.Instance and Source.Config.
State commands (consent, user, globals, custom) are recorded by the
collector immediately, even before run, and delivered to each source's on
handler exactly once per change. The collector tracks what each subscriber has
already received, so re-running or re-registering never double-fires a state
reaction. Sources should not hand-roll their own deduplication for state
deliveries: the collector enforces exactly-once, and delivery is
order-independent (it does not depend on source init order or on whether the
state arrived before or after run).
This mirrors the destination model: Destination.Instance.init handles one-time
bootstrap, Destination.Config.init is the collector-managed "init has run"
flag, and Destination.Config.require gates event delivery the same way. See
walkeros-understanding-destinations.
| Source Type | Signature | Example |
| -------------- | ----------------------------------- | ------------ |
| Cloud Function | push(req, res) → Promise<void> | HTTP handler |
| Browser | push(event, data) → Promise<void> | DOM events |
| DataLayer | push(event, data) → Promise<void> | GTM-style |
Key insight: Source push IS the handler. No wrappers needed.
// Direct deployment
http('handler', source.push);
When a test or integration code needs to invoke a source's push through the
collector bag, collector.sources erases the per-source generic on read. Use
Source.getSource<T>(collector, id) to recover the narrow signature without a
cast. See the testing-strategy skill for the full pattern (symmetric helpers
exist for destinations, transformers, stores).
| Type | Path | Examples |
| ------ | -------------------------- | ------------------ |
| Web | packages/web/sources/ | browser, dataLayer |
| Server | packages/server/sources/ | gcp |
The browser source captures events from DOM using data attributes.
<button data-elb="product" data-elb-product="id:P123;name:Laptop">
<span data-elbaction="click">Add to Cart</span>
</button>
See packages/web/sources/browser/ for implementation.
Captures events from a GTM-style dataLayer array.
window.dataLayer.push({
event: 'product view',
product: { id: 'P123', name: 'Laptop' },
});
See packages/web/sources/dataLayer/ for implementation.
Handle HTTP requests in cloud functions. Server sources use the context pattern:
import type { Source } from '@walkeros/core';
export const sourceCloudFunction: Source.Init<Types> = async (context) => {
const { config = {}, env } = context;
const { push: envPush } = env;
// Apply defaults inline — flow.json is developer-controlled, so no
// runtime validation. Shape checks live in ./schemas and are used by
// `walkeros validate` and dev tooling, never at runtime.
const userSettings = config.settings || {};
const settings = {
...userSettings,
// example default: port: userSettings.port ?? 3000,
};
const push = async (req: Request, res: Response): Promise<void> => {
// Transform HTTP request → walkerOS event
const event = transformRequest(req);
await envPush(event);
res.json({ success: true });
};
return { type: 'cloudfunction', config: { ...config, settings }, push };
};
// Direct deployment
export const handler = source.push;
See packages/server/sources/gcp/ for implementation.
Platform dependencies go through env with fallback to globals or direct
imports. This enables testing and simulation without touching globals.
// Express source: env.express ?? express (import fallback)
const expressLib = env.express ?? express;
const app = expressLib();
// Web sources: env.window ?? window (global fallback)
const win = env.window ?? window;
const doc = env.document ?? document;
Every source's Env interface extends Source.BaseEnv with optional platform
deps:
export interface Env extends Source.BaseEnv {
window?: Window & typeof globalThis; // web sources
document?: Document; // web sources
express?: typeof express; // express source
cors?: typeof cors; // express source
}
Tests inject mocks via env instead of mocking globals. See
testing-strategy.
Sources can wire to pre-collector transformer chains via the next property:
sources: {
browser: {
code: sourceBrowser,
next: 'validate' // First transformer to run after this source
}
}
The transformer chain runs before events reach the collector. See understanding-transformers for chain details.
A single source factory instance handles many concurrent invocations: Express
processes overlapping requests, Lambda reuses one handler across calls, queue
consumers loop over messages. Each logical unit of work is a scope. Server
sources MUST wrap each invocation with
context.withScope(rawScope, respond, body):
const push = async (req, res) => {
const respond = createRespond((options) => {
/* wire options into res */
});
await context.withScope(req, respond, async (env) => {
await env.push(parsedData);
});
};
Inside body, env.push carries that scope's ingest (extracted from
rawScope via config.ingest mapping) and respond end to end through the
pipeline. Concurrent scopes never share ingest or respond.
Browser sources skip withScope. A browser tab is a single logical scope
for its lifetime; calling env.push directly is correct.
When a server source passes a respond to withScope, every transformer and
destination in the pipeline can call
env.respond?.({ body, status?, headers? }) to customize the HTTP response.
First call wins (createRespond is idempotent), so the source's default
response is a no-op if a step already responded.
See @walkeros/server-source-express for the reference implementation.
Every source exports a createTrigger factory from its examples (dev entry)
that follows the unified Trigger.CreateFn interface:
type CreateFn<TContent, TResult> = (
config: Collector.InitConfig,
options?: unknown,
) => Promise<Trigger.Instance<TContent, TResult>>;
createTrigger simulates real-world invocations from the outside — full
blackbox, no source instance access. Each package implements it differently:
| Source | Content | Trigger type | Mechanism |
| ------- | -------------- | ------------------ | ----------------------- |
| Browser | HTML string | click, load... | DOM injection + events |
| Express | HTTP req shape | POST, GET | Real fetch() requests |
The trigger lazily calls startFlow(config) on first invocation. Tests capture
events via spy destinations. See
using-step-examples for testing
patterns.
Sources can implement an optional setup() lifecycle to provision external
resources, for example registering a webhook with a third-party provider,
creating a Pub/Sub subscription, or pre-allocating queue resources. Setup is
never invoked by the runtime, push, init, or deploy. It runs only when an
operator explicitly types walkeros setup source.<name>.
The signature is
(ctx: LifecycleContext<Config<T>, Env<T>>) => Promise<unknown>, where
LifecycleContext carries { id, config, env, logger }. Idempotency is the
package's responsibility: the framework adds no opinion. Use
resolveSetup(ctx.config.setup, DEFAULTS) from @walkeros/core to normalize
the boolean | object shape into a concrete options object.
See walkeros-create-source,
walkeros-understanding-destinations,
walkeros-understanding-stores, and
the walkeros setup CLI documentation for the authoring template and operator
workflow.
Source Files:
Package READMEs:
Documentation:
testing
Use when wiring `@walkeros/transformer-ga4` into a server flow, overriding default GA4 event mappings, dropping events, adding custom event keys, or troubleshooting GA4 Measurement Protocol decoding. Covers the `before`-chain wiring contract, configuration recipes, and per-field patching with extend/remove.
testing
Use when writing, simulating, validating, or testing with walkerOS step examples. Covers the complete lifecycle from authoring examples to CI integration.
tools
Use when bundling walkerOS flows, testing events with simulate/push, running local servers, validating configs, or configuring Flow JSON files.
development
--- name: walkeros-understanding-mapping description: Use when transforming walkerOS events in the flow (source→collector or collector→destination), configuring data/map/loop/set/condition/policy, or using $code: syntax in JSON configs. --- # Understanding walkerOS Mapping ## Overview Mapping transforms data at multiple points in the walkerOS flow: 1. **Source → Collector**: Transform raw input (HTTP requests, dataLayer pushes) into walkerOS events 2. **Collector → Destination**: Transfor