prompts/skills/clojure-core-async/SKILL.md
Async programming with channels, go blocks, and CSP in Clojure. Use when working with async operations, channel communication, pub/sub patterns, concurrent pipelines, or CSP-style coordination.
npx skillsauth add ramblurr/nix-devenv clojure-core-asyncInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
A Clojure library for async programming and communication using CSP (Communicating Sequential Processes) with channels.
core.async enables writing asynchronous code that looks synchronous, avoiding callback hell through channels and lightweight processes (go blocks).
deps.edn:
org.clojure/core.async {:mvn/version "1.8.741"}
Leiningen:
[org.clojure/core.async "1.8.741"]
See https://search.maven.org (search: org.clojure/core.async) for latest version.
(require '[clojure.core.async :as a :refer [<! >! <!! >!! chan go]])
;; Create a buffered channel
(def c (chan 10))
;; Put and take from ordinary threads (blocking)
(>!! c "hello")
(<!! c) ; => "hello"
;; Use go blocks for lightweight async processes
(let [c (chan)]
(go (>! c "world"))
(println (<!! (go (<! c))))) ; => "world"
;; Timeout after 100ms
(a/alts!! [(chan) (a/timeout 100)])
Channels are queues that carry values between processes.
(chan) ; unbuffered (rendezvous)
(chan 10) ; fixed buffer
(chan (a/dropping-buffer 10)) ; drops newest when full
(chan (a/sliding-buffer 10)) ; drops oldest when full
(chan 10 (map inc)) ; with transducer (must be buffered)
(a/close! c) ; close channel
Important: Channels cannot carry nil - nil from a take means channel is closed and drained.
;; BLOCKING (>!!, <!!) - use in ordinary threads, NOT in go blocks
(>!! c "msg") ; blocks until accepted
(<!! c) ; blocks until available
;; PARKING (>!, <!) - ONLY inside go blocks
(go
(>! c "msg") ; parks go block, doesn't block thread
(<! c)) ; parks until available
;; ASYNC (put!, take!) - callbacks, works anywhere
(a/put! c "msg" #(println "put completed"))
(a/take! c #(println "got:" %))
;; NON-BLOCKING (offer!, poll!) - immediate or fail
(a/offer! c "msg") ; returns true if succeeded
(a/poll! c) ; returns value if available, nil otherwise
Mnemonic: > points into channel (put), < points out (take).
;; go block - lightweight process (parking ops only)
(go
(let [v (<! some-chan)]
(>! result-chan (inc v))))
;; go-loop - infinite processing
(go-loop []
(when-let [v (<! c)]
(println v)
(recur)))
;; thread - for blocking I/O
(a/thread
(Thread/sleep 1000) ; blocking OK here
(<!! some-chan)
"result")
CRITICAL: NEVER use blocking ops (<!!, >!!, Thread/sleep, blocking I/O) inside go blocks!
;; alts! in go block (parking)
(go
(let [[v ch] (a/alts! [c1 c2 c3])]
(println "Got" v "from" ch)))
;; alts!! outside go blocks (blocking)
(a/alts!! [c1 c2 (a/timeout 1000)])
;; With default (non-blocking)
(a/alts!! [c1 c2] :default :nothing)
;; Include puts
(a/alts! [[out-c "value"] in-c])
;; alt - alts with pattern matching
(go
(a/alt!
c1 ([v] (println "c1:" v))
c2 ([v] (println "c2:" v))
(a/timeout 1000) ([_] (println "timeout"))))
;; pipeline - computational work (non-blocking)
(a/pipeline 4 out (map #(* % %)) in)
;; pipeline-blocking - I/O work
(a/pipeline-blocking 4 out
(map (fn [url]
(Thread/sleep 100) ; blocking OK here
(fetch url)))
in)
;; pipeline-async - async callbacks
(defn async-fetch [url result-chan]
(http/get url (fn [response]
(a/put! result-chan response)
(a/close! result-chan))))
(a/pipeline-async 10 out async-fetch in)
;; Create publication with topic function
(def publication (a/pub in-chan :msg-type))
;; Subscribe channels to topics
(a/sub publication :greeting greeting-chan)
(a/sub publication :farewell farewell-chan)
;; Publish messages (to original in-chan)
(>!! in-chan {:msg-type :greeting :text "Hi"})
;; With topic-specific buffers
(a/pub in-chan :type
(fn [topic]
(case topic
:critical (a/dropping-buffer 1000)
:normal (a/sliding-buffer 10)
1)))
;; Create mult from source (copies to ALL taps)
(def m (a/mult source-chan))
(a/tap m listener1)
(a/tap m listener2)
(>!! source-chan "broadcast") ; all listeners receive
(a/onto-chan! c [1 2 3]) ; put coll onto channel
(<!! (a/into [] c)) ; take into vector
(<!! (a/reduce + 0 c)) ; reduce over channel
(a/merge [c1 c2 c3]) ; merge channels
(a/pipe in-chan out-chan) ; pipe one to another
NEVER do blocking operations in go blocks - they will block the entire thread pool:
;; BAD
(go
(<!! some-chan) ; WRONG - use <! instead
(Thread/sleep 1000) ; WRONG - use (<! (timeout 1000))
(.read socket)) ; WRONG - use thread instead
;; GOOD
(a/thread
(<!! some-chan) ; OK
(Thread/sleep 1000) ; OK
(.read socket)) ; OK
Enable go checking in development: -Dclojure.core.async.go-checking=true
Parking ops don't work across function boundaries:
;; BAD: <! inside fn created by map/for
(go (map <! channels))
(go (for [c cs] (<! c)))
;; GOOD: doseq/loop don't create closures
(go (doseq [c cs] (println (<! c))))
(go-loop [cs channels]
(when-let [c (first cs)] (println (<! c)) (recur (rest cs))))
;; BAD: unbuffered channel deadlocks
(let [c (chan)] (>!! c "msg") (<!! c)) ; blocks forever
;; GOOD: use buffer or separate process
(let [c (chan 1)] (>!! c "msg") (<!! c))
(let [c (chan)] (go (>! c "msg")) (<!! c))
;; Never put nil - it's the "closed channel" signal
(>!! c nil) ; BAD - throws
(>!! c ::none) ; GOOD - sentinel value
;; Close from producer, check on consumer
(a/close! c)
(go-loop []
(when-let [v (<! c)] ; nil when closed
(process v)
(recur)))
If ANY subscriber can't accept, WHOLE publication blocks - always buffer subscribers:
(a/sub pub :topic (chan 100)) ; GOOD - buffered
(a/put! c val (fn [_] nil)) ; efficient
(go (>! c val)) ; wasteful
;; Promise channel - single-value (like futures)
(def p (a/promise-chan))
(>!! p "first")
(<!! p) ; => "first" (cached, all readers get same value)
;; Fan-out: one -> many workers
(dotimes [_ 5] (go-loop [] (when-let [v (<! in)] (process v) (recur))))
;; Fan-in: many -> one (use merge)
(a/merge [producer1 producer2 producer3])
testing
Use this OCP when executing or preparing to execute commands that change a live or important system, service reloads/restarts, package changes, deployments, migrations, firewall/network/access changes, credential rotation, NixOS switch/test/boot/deploy, or incident mitigation. It guides safe operations with a persisted ledger for scope, preflight, baseline, rollback, validation, and evidence.
development
Create new agent skills with proper structure, progressive disclosure, and bundled resources. Use when user wants to create, write, or build a new skill.
documentation
Naming conventions for workflow documents in prompts/. Use when creating plans, PRDs, research reports, idea capture or other workflow documents. Triggers on (1) creating new planning documents, (2) naming PRDs or research reports, (3) questions about document organization in prompts/.
testing
Grilling session that challenges your plan against the existing domain model, sharpens terminology, and updates documentation (CONTEXT.md, ADRs) inline as decisions crystallise. Use when user wants to stress-test a plan against their project's language and documented decisions.