skills/golang-samber-ro/SKILL.md
Reactive streams and event-driven programming in Golang using samber/ro — ReactiveX implementation with 150+ type-safe operators, cold/hot observables, 5 subject types (Publish, Behavior, Replay, Async, Unicast), declarative pipelines via Pipe, 40+ plugins (HTTP, cron, fsnotify, JSON, logging), automatic backpressure, error propagation, and Go context integration. Apply when using or adopting samber/ro, when the codebase imports github.com/samber/ro, or when building asynchronous event-driven pipelines, real-time data processing, streams, or reactive architectures in Go. Not for finite slice transforms (→ See `golang-samber-lo` skill).
npx skillsauth add rockcookies/skills golang-samber-roInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use ultrathink when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more information. Context7 can help as a discoverability platform.
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. samber/ro solves this with declarative, chainable stream operators.
When to use which tool:
| Scenario | Tool | Why |
| --- | --- | --- |
| Transform a slice (map, filter, reduce) | samber/lo | Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling | errgroup | Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) | samber/ro | Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources | samber/ro | CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source | samber/ro | Hot observables (Share/Subjects) handle multicast natively |
Key differences: lo vs ro
| Aspect | samber/lo | samber/ro |
| --- | --- | --- |
| Data | Finite slices | Infinite streams |
| Execution | Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return (T, error) per call | Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |
go get github.com/samber/ro
Four building blocks:
onNext(T), onError(error), onComplete()Pipe.Wait() to block or .Unsubscribe() to cancelobservable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"
// Or collect synchronously:
values, err := ro.Collect(observable)
Cold (default): each .Subscribe() starts a new independent execution. Safe and predictable — use by default.
Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
| Convert with | Behavior |
| --- | --- |
| Share() | Cold → hot with reference counting. Last unsubscribe tears down |
| ShareReplay(n) | Same as Share + buffers last N values for late subscribers |
| Connectable() | Cold → hot, but waits for explicit .Connect() call |
| Subjects | Natively hot — call .Send(), .Error(), .Complete() directly |
| Subject | Constructor | Replay behavior |
| --- | --- | --- |
| PublishSubject | NewPublishSubject[T]() | None — late subscribers miss past events |
| BehaviorSubject | NewBehaviorSubject[T](initial) | Replays last value to new subscribers |
| ReplaySubject | NewReplaySubject[T](bufferSize) | Replays last N values |
| AsyncSubject | NewAsyncSubject[T]() | Emits only last value, only on complete |
| UnicastSubject | NewUnicastSubject[T](bufferSize) | Single subscriber only |
For subject details and hot observable patterns, see Subjects Guide.
| Category | Key operators | Purpose |
| --- | --- | --- |
| Creation | Just, FromSlice, FromChannel, Range, Interval, Defer, Future | Create observables from various sources |
| Transform | Map, MapErr, FlatMap, Scan, Reduce, GroupBy | Transform or accumulate stream values |
| Filter | Filter, Take, TakeLast, Skip, Distinct, Find, First, Last | Selectively emit values |
| Combine | Merge, Concat, Zip2–Zip6, CombineLatest2–CombineLatest5, Race | Merge multiple observables |
| Error | Catch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfig | Recover from errors |
| Timing | Delay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTime | Control emission timing |
| Side effect | Tap/Do, TapOnNext, TapOnError, TapOnComplete | Observe without altering stream |
| Terminal | Collect, ToSlice, ToChannel, ToMap | Consume stream into Go types |
Use typed Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.
For the complete operator catalog (150+ operators with signatures), see Operators Guide.
| Mistake | Why it fails | Fix |
| --- | --- | --- |
| Using ro.OnNext() without error handler | Errors are silently dropped — bugs hide in production | Use ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks |
| Using untyped Pipe() instead of Pipe2/Pipe3 | Loses compile-time type safety, errors surface at runtime | Use Pipe2, Pipe3...Pipe25 for typed operator chains |
| Forgetting .Unsubscribe() on infinite streams | Goroutine leak — the observable runs forever | Use TakeUntil(signal), context cancellation, or explicit Unsubscribe() |
| Using Share() when cold is sufficient | Unnecessary complexity, harder to reason about lifecycle | Use hot observables only when multiple consumers need the same stream |
| Using samber/ro for finite slice transforms | Stream overhead (goroutines, subscriptions) for a synchronous operation | Use samber/lo — it's simpler, faster, and purpose-built for slices |
| Not propagating context for cancellation | Streams ignore shutdown signals, causing resource leaks on termination | Chain ContextWithTimeout or ThrowOnContextCancel in the pipeline |
NewObserver(onNext, onError, onComplete), not just OnNext. Unhandled errors cause silent data lossCollect() for synchronous consumption — when the stream is finite and you need []T, Collect blocks until complete and returns the slice + errorPipe2, Pipe3...Pipe25 catch type mismatches at compile time. Reserve untyped Pipe for dynamic operator chainsTake(n), TakeUntil(signal), Timeout(d), or context cancellation. Unbounded streams leak goroutinesTap/Do for observability — log, trace, or meter emissions without altering the stream. Chain TapOnError for error monitoringsamber/lo for simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, use lo. Reach for ro when data arrives over time, from multiple sources, or needs retry/timeout/backpressure40+ plugins extend ro with domain-specific operators:
| Category | Plugins | Import path prefix |
| --- | --- | --- |
| Encoding | JSON, CSV, Base64, Gob | plugins/encoding/... |
| Network | HTTP, I/O, FSNotify | plugins/http, plugins/io, plugins/fsnotify |
| Scheduling | Cron, ICS | plugins/cron, plugins/ics |
| Observability | Zap, Slog, Zerolog, Logrus, Sentry, Oops | plugins/observability/..., plugins/samber/oops |
| Rate limiting | Native, Ulule | plugins/ratelimit/... |
| Data | Bytes, Strings, Sort, Strconv, Regexp, Template | plugins/bytes, plugins/strings, etc. |
| System | Process, Signal | plugins/proc, plugins/signal |
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.
golang-samber-lo skill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slicegolang-samber-mo skill for monadic types (Option, Result, Either) that compose with ro pipelinesgolang-samber-hot skill for in-memory caching (also available as an ro plugin)golang-concurrency skill for goroutine/channel patterns when reactive streams are overkillgolang-observability skill for monitoring reactive pipelines in productiondevelopment
Vue 3 debugging and error handling for runtime errors, warnings, async failures, and SSR/hydration issues. Use when diagnosing or fixing Vue issues.
development
MUST be used for Vue.js tasks. Strongly recommends Composition API with `<script setup>` and TypeScript as the standard approach. Covers Vue 3, SSR, Volar, vue-tsc. Load for any Vue, .vue files, Vue Router, Pinia, or Vite with Vue work. ALWAYS use Composition API unless the project explicitly requires Options API.
development
GORM Gen 类型安全 DAO 代码生成,基于 github.com/rockcookies/go-gen(rockcookies fork)。涵盖代码生成配置、模型生成、查询构建、增删改查、关联关系、动态 SQL 注解、事务处理、datatypes 自定义字段类型(JSON/JSONMap/JSONSlice/JSONType/Date/UUID)、soft_delete 软删除插件(unix 时间戳/flag 模式),以及 fork 专有功能:Tmpl 运行时模板覆写(18 个模板)、Unsafe 底层方法(UnsafeSetDB/Alias/ModelType/TableName)、IGenericsDo[T,E] 泛型接口。使用时机:需要从数据库生成 DAO 代码(GenerateModel/GenerateModelAs)、编写 DAL 查询(DO 链式调用、DaoScope、事务、关联加载)、配置生成器(gen.Config、ModelOpt、FieldGORMTag、FieldModify、FieldType、Tmpl 自定义模板)、使用 datatypes(JSONMap、JSONSlice、JSONQuery、JSONSet)或 soft_delete(DeletedAt、softDelete:milli、deleteOpts)时使用本技能。当用户消息中包含以下任一关键词(go-gen、gorm-gen、GenerateModelAs、ModelOpt、FieldGORMTag、FieldModify、DaoScope、LoadOneToMany、LoadManyToMany、IGenericsDo、UnsafeSetDB、datatypes、JSONMap、JSONSlice、JSONQuery、soft_delete、softDelete、DeletedAt),或用户明确请求 GORM Gen 代码生成/DAO 编写时触发本技能。
development
轻量级 Go HTTP 客户端库,基于 github.com/rockcookies/go-fetch(零外部依赖)。涵盖 Dispatcher 初始化与中间件、Request 链式构建(RequestFunc 与 Middleware 分层)、Response 解码(JSON/XML/流)、请求体编码(JSON/XML/Form/Multipart/BodyGet)、URL 参数(PrepareURLMiddleware/URLOptions)、Header/Cookie 管理(ApplyHeader/ApplyCookie 与 Context)、中间件组合(Dispatcher/Request/Do 三层)、HTTP 交换日志(dump.New/dump.Transport/过滤器/WithRequestRedactor/WithResponseRedactor/SlogWriter)。使用时机:需要发起 HTTP 请求(GET/POST/PUT/PATCH/DELETE,均需 context.Context)、上传文件(Multipart/GetReader)、配置全局认证头(dispatcher.Use)、记录 HTTP 交换日志(dump.New、WithFilter、DefaultRedactor)、构建可复用的请求基础(Request.Clone)时使用本技能。当用户消息中包含以下任一关键词(go-fetch、NewDispatcher、NewDispatcherWithTransport、RequestFunc、PreFuncs、UseFuncs、BodyGet、MultipartField、dump.New、WithFilter、WithRequestRedactor、WithResponseRedactor、DefaultRedactor、DumpOptions、SlogWriter、URLOptions、PrepareURLMiddleware、PathParams、SetURLOptions、WithURLOptions、ApplyHeader、SetHeaderOptions、WithHeaderOptions、ApplyCookie、SetCookieOptions、WithCookieOptions、HandlerFunc、fetch.Handler、fetch.Middleware、dispatcher.Use、resp.Close、resp.JSON、resp.XML),或用户明确请求 go-fetch HTTP 客户端用法时触发本技能。