library/specializations/web-development/skills/rxjs/SKILL.md
RxJS reactive programming patterns including operators, error handling, multicasting, and Angular integration.
npx skillsauth add a5c-ai/babysitter rxjsInstall this skill globally with one command. Works with Claude Code, Cursor, and Windsurf.
3 of 9 scanners reported clean
Some scanners were skipped, did not run, or reported a non-clean status. Review each row below.
Expert assistance for implementing reactive programming with RxJS in Angular and JavaScript applications.
Invoke this skill when you need to:
| Parameter | Type | Required | Description | |-----------|------|----------|-------------| | useCase | string | Yes | http, websocket, events, state | | operators | array | No | Specific operators needed | | errorHandling | boolean | No | Include error handling |
{
"useCase": "http",
"operators": ["switchMap", "debounceTime", "catchError"],
"errorHandling": true
}
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import {
Observable,
Subject,
BehaviorSubject,
catchError,
debounceTime,
distinctUntilChanged,
switchMap,
retry,
shareReplay,
tap,
map,
of,
} from 'rxjs';
@Injectable({ providedIn: 'root' })
export class SearchService {
private http = inject(HttpClient);
private searchTerms = new Subject<string>();
// Shared cache with replay
private users$ = this.http.get<User[]>('/api/users').pipe(
retry(3),
shareReplay(1),
catchError(() => of([]))
);
search(term: string) {
this.searchTerms.next(term);
}
results$ = this.searchTerms.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap((term) =>
term.length < 2
? of([])
: this.http.get<User[]>(`/api/users?search=${term}`).pipe(
catchError(() => of([]))
)
)
);
}
import { BehaviorSubject, Observable, distinctUntilChanged, map } from 'rxjs';
interface AppState {
user: User | null;
loading: boolean;
error: string | null;
}
@Injectable({ providedIn: 'root' })
export class StateService {
private state = new BehaviorSubject<AppState>({
user: null,
loading: false,
error: null,
});
// Selectors
user$ = this.select((state) => state.user);
loading$ = this.select((state) => state.loading);
error$ = this.select((state) => state.error);
isAuthenticated$ = this.user$.pipe(map((user) => !!user));
private select<T>(selector: (state: AppState) => T): Observable<T> {
return this.state.pipe(map(selector), distinctUntilChanged());
}
setState(partial: Partial<AppState>) {
this.state.next({ ...this.state.value, ...partial });
}
getState(): AppState {
return this.state.value;
}
}
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
Observable,
retry,
share,
filter,
map,
takeUntil,
Subject,
} from 'rxjs';
interface Message {
type: string;
payload: unknown;
}
@Injectable({ providedIn: 'root' })
export class WebSocketService {
private socket$: WebSocketSubject<Message> | null = null;
private destroy$ = new Subject<void>();
connect(url: string): Observable<Message> {
if (!this.socket$) {
this.socket$ = webSocket<Message>(url);
}
return this.socket$.pipe(
retry({ delay: 3000, count: 5 }),
share(),
takeUntil(this.destroy$)
);
}
on<T>(type: string): Observable<T> {
return this.socket$!.pipe(
filter((msg) => msg.type === type),
map((msg) => msg.payload as T)
);
}
send(type: string, payload: unknown) {
this.socket$?.next({ type, payload });
}
disconnect() {
this.destroy$.next();
this.socket$?.complete();
this.socket$ = null;
}
}
import {
combineLatest,
forkJoin,
merge,
concat,
race,
zip,
switchMap,
map,
} from 'rxjs';
@Injectable({ providedIn: 'root' })
export class DashboardService {
// Parallel requests - wait for all
loadDashboard() {
return forkJoin({
user: this.http.get<User>('/api/user'),
stats: this.http.get<Stats>('/api/stats'),
notifications: this.http.get<Notification[]>('/api/notifications'),
});
}
// Latest values from multiple streams
dashboardData$ = combineLatest({
user: this.userService.user$,
theme: this.settingsService.theme$,
}).pipe(
map(({ user, theme }) => ({
greeting: `Hello, ${user?.name}`,
isDark: theme === 'dark',
}))
);
// Merge multiple event streams
allEvents$ = merge(
this.userEvents$,
this.systemEvents$,
this.notificationEvents$
);
}
import {
catchError,
retry,
retryWhen,
delay,
throwError,
EMPTY,
of,
} from 'rxjs';
// Simple retry
this.http.get('/api/data').pipe(
retry(3),
catchError((error) => {
console.error('Request failed:', error);
return of(fallbackData);
})
);
// Retry with delay
this.http.get('/api/data').pipe(
retry({
count: 3,
delay: (error, retryCount) => {
console.log(`Retry ${retryCount}`);
return of(null).pipe(delay(1000 * retryCount));
},
}),
catchError((error) => throwError(() => new Error('Max retries exceeded')))
);
// Error recovery
this.http.get('/api/primary').pipe(
catchError(() => this.http.get('/api/fallback')),
catchError(() => of(defaultValue))
);
import { Component, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
@Component({...})
export class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.dataService.data$
.pipe(takeUntil(this.destroy$))
.subscribe((data) => {
this.data = data;
});
this.userService.user$
.pipe(takeUntil(this.destroy$))
.subscribe((user) => {
this.user = user;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
| Category | Operators | |----------|-----------| | Creation | of, from, interval, timer, fromEvent | | Transform | map, switchMap, mergeMap, concatMap, exhaustMap | | Filter | filter, take, takeUntil, first, distinctUntilChanged | | Combine | combineLatest, forkJoin, merge, concat, zip | | Error | catchError, retry, throwError | | Multicast | share, shareReplay, publish |
development
Model documentation skill for generating model cards following Google's model card framework.
development
MLflow integration skill for experiment tracking, model registry, and artifact management. Enables LLMs to log experiments, compare runs, manage model lifecycle, and retrieve artifacts through the MLflow API.
data-ai
LIME-based local explanation skill for individual predictions across tabular, text, and image data.
devops
Kubeflow Pipelines skill for ML workflow orchestration, component management, and Kubernetes-native ML.