Agent skill
rxjs
RxJS reactive programming patterns including operators, error handling, multicasting, and Angular integration.
Install this agent skill to your Project
npx add-skill https://github.com/a5c-ai/babysitter/tree/main/library/specializations/web-development/skills/rxjs
SKILL.md
RxJS Skill
Expert assistance for implementing reactive programming with RxJS in Angular and JavaScript applications.
Capabilities
- Create and compose observables
- Apply transformation and filtering operators
- Handle errors and retries
- Implement multicasting patterns
- Manage subscriptions and memory
- Integrate with Angular HTTP and forms
Usage
Invoke this skill when you need to:
- Handle async data streams
- Compose complex data pipelines
- Implement real-time features
- Manage concurrent requests
- Handle WebSocket streams
Inputs
| Parameter | Type | Required | Description |
|---|---|---|---|
| useCase | string | Yes | http, websocket, events, state |
| operators | array | No | Specific operators needed |
| errorHandling | boolean | No | Include error handling |
Configuration Example
{
"useCase": "http",
"operators": ["switchMap", "debounceTime", "catchError"],
"errorHandling": true
}
Observable Patterns
HTTP with Operators
import { Injectable, inject } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import {
Observable,
Subject,
BehaviorSubject,
catchError,
debounceTime,
distinctUntilChanged,
switchMap,
retry,
shareReplay,
tap,
map,
of,
} from 'rxjs';
@Injectable({ providedIn: 'root' })
export class SearchService {
private http = inject(HttpClient);
private searchTerms = new Subject<string>();
// Shared cache with replay
private users$ = this.http.get<User[]>('/api/users').pipe(
retry(3),
shareReplay(1),
catchError(() => of([]))
);
search(term: string) {
this.searchTerms.next(term);
}
results$ = this.searchTerms.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap((term) =>
term.length < 2
? of([])
: this.http.get<User[]>(`/api/users?search=${term}`).pipe(
catchError(() => of([]))
)
)
);
}
State Management Pattern
import { BehaviorSubject, Observable, distinctUntilChanged, map } from 'rxjs';
interface AppState {
user: User | null;
loading: boolean;
error: string | null;
}
@Injectable({ providedIn: 'root' })
export class StateService {
private state = new BehaviorSubject<AppState>({
user: null,
loading: false,
error: null,
});
// Selectors
user$ = this.select((state) => state.user);
loading$ = this.select((state) => state.loading);
error$ = this.select((state) => state.error);
isAuthenticated$ = this.user$.pipe(map((user) => !!user));
private select<T>(selector: (state: AppState) => T): Observable<T> {
return this.state.pipe(map(selector), distinctUntilChanged());
}
setState(partial: Partial<AppState>) {
this.state.next({ ...this.state.value, ...partial });
}
getState(): AppState {
return this.state.value;
}
}
WebSocket Stream
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
Observable,
retry,
share,
filter,
map,
takeUntil,
Subject,
} from 'rxjs';
interface Message {
type: string;
payload: unknown;
}
@Injectable({ providedIn: 'root' })
export class WebSocketService {
private socket$: WebSocketSubject<Message> | null = null;
private destroy$ = new Subject<void>();
connect(url: string): Observable<Message> {
if (!this.socket$) {
this.socket$ = webSocket<Message>(url);
}
return this.socket$.pipe(
retry({ delay: 3000, count: 5 }),
share(),
takeUntil(this.destroy$)
);
}
on<T>(type: string): Observable<T> {
return this.socket$!.pipe(
filter((msg) => msg.type === type),
map((msg) => msg.payload as T)
);
}
send(type: string, payload: unknown) {
this.socket$?.next({ type, payload });
}
disconnect() {
this.destroy$.next();
this.socket$?.complete();
this.socket$ = null;
}
}
Combining Streams
import {
combineLatest,
forkJoin,
merge,
concat,
race,
zip,
switchMap,
map,
} from 'rxjs';
@Injectable({ providedIn: 'root' })
export class DashboardService {
// Parallel requests - wait for all
loadDashboard() {
return forkJoin({
user: this.http.get<User>('/api/user'),
stats: this.http.get<Stats>('/api/stats'),
notifications: this.http.get<Notification[]>('/api/notifications'),
});
}
// Latest values from multiple streams
dashboardData$ = combineLatest({
user: this.userService.user$,
theme: this.settingsService.theme$,
}).pipe(
map(({ user, theme }) => ({
greeting: `Hello, ${user?.name}`,
isDark: theme === 'dark',
}))
);
// Merge multiple event streams
allEvents$ = merge(
this.userEvents$,
this.systemEvents$,
this.notificationEvents$
);
}
Error Handling Patterns
import {
catchError,
retry,
retryWhen,
delay,
throwError,
EMPTY,
of,
} from 'rxjs';
// Simple retry
this.http.get('/api/data').pipe(
retry(3),
catchError((error) => {
console.error('Request failed:', error);
return of(fallbackData);
})
);
// Retry with delay
this.http.get('/api/data').pipe(
retry({
count: 3,
delay: (error, retryCount) => {
console.log(`Retry ${retryCount}`);
return of(null).pipe(delay(1000 * retryCount));
},
}),
catchError((error) => throwError(() => new Error('Max retries exceeded')))
);
// Error recovery
this.http.get('/api/primary').pipe(
catchError(() => this.http.get('/api/fallback')),
catchError(() => of(defaultValue))
);
Subscription Management
import { Component, OnDestroy } from '@angular/core';
import { Subject, takeUntil } from 'rxjs';
@Component({...})
export class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
this.dataService.data$
.pipe(takeUntil(this.destroy$))
.subscribe((data) => {
this.data = data;
});
this.userService.user$
.pipe(takeUntil(this.destroy$))
.subscribe((user) => {
this.user = user;
});
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
Operator Reference
| Category | Operators |
|---|---|
| Creation | of, from, interval, timer, fromEvent |
| Transform | map, switchMap, mergeMap, concatMap, exhaustMap |
| Filter | filter, take, takeUntil, first, distinctUntilChanged |
| Combine | combineLatest, forkJoin, merge, concat, zip |
| Error | catchError, retry, throwError |
| Multicast | share, shareReplay, publish |
Best Practices
- Always unsubscribe or use takeUntil
- Use shareReplay for caching
- Prefer switchMap for HTTP requests
- Handle errors at appropriate levels
- Use subjects sparingly
Target Processes
- angular-enterprise-development
- real-time-features
- state-management-setup
- data-streaming
Recommended Agent Skills
Expand your agent's capabilities with these related and highly-rated skills.
gsd-tools
Central utility skill for GSD operations. Provides config parsing, slug generation, timestamps, path operations, and orchestrates calls to other specialized skills. Acts as the unified entry point that the original gsd-tools.cjs provided via its lib/ modules (commands, config, core, init).
model-profile-resolution
Resolve model profile (quality/balanced/budget) at orchestration start and map agents to specific models. Enables cost/quality tradeoffs by selecting appropriate AI models for each agent role.
verification-suite
Plan structure validation, phase completeness checks, reference integrity verification, and artifact existence confirmation. Provides the structured verification layer ensuring GSD artifacts are well-formed and complete.
state-management
STATE.md reading, writing, and field-level updates. Provides cross-session state persistence via .planning/STATE.md with structured fields for current task, completed phases, blockers, decisions, and quick tasks.
git-integration
Git commit patterns, formats, and conventions for GSD methodology. Provides atomic commits per task, structured commit messages, planning file commits, branch management, and milestone tag operations.
frontmatter-parsing
YAML frontmatter parsing and manipulation for .planning/ documents. Provides read, write, update, query, and validation operations on frontmatter blocks in GSD markdown artifacts.
Didn't find tool you were looking for?