flowing/SKILL.md
DAG workflow runner that encodes control flow in code, not prose. Use when a procedure has 3+ steps with branching, retries, or validation that must be enforced — gates as `when=`, edge contracts as `validate=`, predicate loops as `retry_until=`. The runner owns the graph; the LLM provides leaves. Also covers parallel execution, checkpoint resume, detached side-effects.
npx skillsauth add oaustegard/claude-skills flowingInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Claude Code's dynamic workflows orchestrate subagents (separate contexts, fan-out to 16-concurrent / 1000-agent). This skill is a different primitive: single-context control flow over YOUR OWN tool calls, with durable side-effects and checkpoint resume. The workflows runtime explicitly cannot touch the filesystem or shell directly — its agents do the work and the script only coordinates them. Flowing is the inverse: the script does the work.
Use flowing for an in-context pipeline (3+ steps, branches, retries, validation, detached side-effects). Use a workflow when you need many subagents. They compose; they do not compete. Do not abandon flowing for a workflow — you would lose the durable side-effects and the cross-session checkpoint that hub-spoke depends on.
When a procedure needs 3+ steps with branches, retries, or contracts, encode it as a DAG of Python tasks instead of prose imperatives. Prose like "first X, then Y, then if Z retry 3×" is read and generated past. A @task graph is structural: a step physically cannot run until its inputs are bound, and gates that fire on bad inputs can't be skipped.
The runner owns control flow — branching, retrying, validating, propagating failures, parallelizing. You provide judgment at the leaves. Runner: scripts/flowing.py.
from flowing import task, Flow
@task
def fetch_data():
return {"items": [1, 2, 3]}
@task(depends_on=[fetch_data])
def process(fetch_data): # param name must match the dep's name
return sum(fetch_data["items"])
@task(depends_on=[process])
def store(process):
print(f"Result: {process}")
Flow(store).run() # topo-sorts, runs each layer, parallel within a layer
Each task receives its dependencies as kwargs named after them. Independent tasks in the same layer run in parallel.
Encode branches and contracts as graph structure, not if statements inside task bodies.
when= — conditional gateRun the task only if the predicate (over gathered dep values) is truthy. Falsy → SKIPPED, and the skip propagates to dependents.
@task(depends_on=[fetch], when=lambda fetch: fetch["needs_processing"])
def process(fetch):
return transform(fetch["payload"])
validate= — edge contractCheck gathered dep values before the body runs. Raise → FAILED with no retry (bad inputs don't fix themselves). Pass → proceed.
def must_have_items(fetch):
if not fetch.get("items"):
raise ValueError("fetch returned empty payload")
@task(depends_on=[fetch], validate=must_have_items)
def process(fetch):
return sum(fetch["items"])
retry_until= — predicate-driven loopRun the body, then call retry_until(value). True → done. False → retry, consuming the retry= budget. Use for self-correcting LLM steps: generate, check, regenerate.
@task(retry=4, retry_until=lambda r: r["valid"])
def generate_until_valid():
candidate = llm_call(...)
return {"valid": passes_schema(candidate), "candidate": candidate}
Distinct from retry= alone, which only retries on a raised exception.
max_workers=).detached=True — side-effect tasks (memory writes, notifications) that run after the main DAG and never block it on failure.flow.run() → fix → flow.resume() re-runs from the failure point, keeping succeeded tasks cached. flow.override(task, value) injects a corrected result.timeout_s=, retry= with exponential backoff, fail_fast=.Read references/reference.md before using anything beyond the quick start and the three primitives above — it covers every @task parameter, the Flow methods, resume/override, detached auto-discovery, and the validate=/when= signature-matching gotcha.
when= makes them structural.validate= makes them enforceable.retry_until= puts the check in the loop.detached=True.If you find yourself writing prose like "first call X, validate Y, then if Z retry up to 3 times" — that is a flowing graph. Refactor before shipping. Prose imperatives don't enforce; @task graphs do.
testing
Disciplined, validation-gated revision of an EXISTING skill so each edit is a measured improvement rather than a guess. Use when editing, revising, or tuning a skill that already exists and there is evidence it underperforms (observed failures, drift, complaints) — invoke by name, or have versioning-skills / creating-skill defer to it before applying edits. Not for authoring a brand-new skill from scratch (use creating-skill) or one-off prose.
development
Skill-aware orchestration with context routing. Decomposes complex tasks into skill-typed subtasks, extracts targeted context subsets, executes subagents in parallel, and synthesizes results. Self-answers trivial lookups inline. No SDK dependency — uses raw HTTP via httpx. Use when tasks require multiple analytical perspectives, when context is large and subtasks only need portions, or when orchestrating-agents spawns too many redundant subagents.
tools
Orchestrates parallel API instances, delegated sub-tasks, and multi-agent workflows with streaming and tool-enabled delegation patterns. Use for parallel analysis, multi-perspective reviews, or complex task decomposition.
development
Invokes Google Gemini models for structured outputs, image generation, multi-modal tasks, and Google-specific features. Use when users request Gemini, image generation, structured JSON output, Google API integration, or cost-effective parallel processing.