Agent skill

rxjs-patterns

RxJS reactive programming patterns for Angular applications. Use when implementing observables, operators, error handling, memory management, subscription cleanup, or advanced reactive patterns. Covers operators, multicasting, backpressure, and integration with Angular Signals.

Stars 163
Forks 31

Install this agent skill to your Project

npx add-skill https://github.com/majiayu000/claude-skill-registry/tree/main/skills/data/rxjs-patterns

SKILL.md

RxJS Patterns

Expert guidance for reactive programming with RxJS in Angular applications, focusing on best practices, common patterns, and performance optimization.

When to Use This Skill

Activate this skill when you need to:

  • Create and manage observables effectively
  • Chain RxJS operators for data transformation
  • Handle errors in reactive streams
  • Prevent memory leaks from subscriptions
  • Implement debouncing, throttling, or buffering
  • Share observables with multiple subscribers
  • Integrate RxJS with Angular Signals
  • Optimize reactive data flows
  • Implement advanced patterns (retry, polling, caching)

Core Operators

Transformation

typescript
// map - Transform each value
source$.pipe(
  map(user => user.name)
)

// mergeMap/switchMap/concatMap/exhaustMap
// Choose based on concurrency needs:
// - switchMap: Cancel previous, use for search
// - mergeMap: Run concurrently, use for independent operations
// - concatMap: Queue sequentially, use for ordered operations
// - exhaustMap: Ignore new while running, use for save/submit

// Search example with switchMap
searchTerm$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(term => this.searchService.search(term))
)

// Save example with exhaustMap
saveButton$.pipe(
  exhaustMap(() => this.saveService.save(data))
)

Filtering

typescript
// filter - Emit only matching values
source$.pipe(
  filter(user => user.age >= 18)
)

// distinctUntilChanged - Skip duplicate consecutive values
input$.pipe(
  distinctUntilChanged()
)

// take/takeUntil - Limit emissions
source$.pipe(
  take(5) // Take first 5
)

source$.pipe(
  takeUntil(destroy$) // Unsubscribe pattern
)

// debounceTime/throttleTime - Rate limiting
input$.pipe(
  debounceTime(300) // Wait 300ms after last input
)

click$.pipe(
  throttleTime(1000) // Emit at most once per second
)

Combination

typescript
// combineLatest - Emit when any observable emits
combineLatest([user$, settings$]).pipe(
  map(([user, settings]) => ({ user, settings }))
)

// forkJoin - Emit when all complete
forkJoin({
  user: getUserById(id),
  posts: getUserPosts(id),
  comments: getUserComments(id)
}).subscribe(({ user, posts, comments }) => {
  // All loaded
})

// merge - Merge multiple observables
merge(click$, hover$, focus$).subscribe()

// zip - Pair emissions by index
zip(numbers$, letters$).pipe(
  map(([num, letter]) => `${num}${letter}`)
)

Error Handling

catchError

typescript
// Handle errors gracefully
this.http.get('/api/data').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]); // Return fallback value
  })
)

// Re-throw after logging
this.http.get('/api/data').pipe(
  catchError(error => {
    this.logger.error(error);
    return throwError(() => new Error('Failed to load data'));
  })
)

Retry Logic

typescript
// retry - Retry on error
this.http.get('/api/data').pipe(
  retry(3),
  catchError(error => of([]))
)

// retryWhen - Advanced retry with delay
this.http.get('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    scan((retryCount, err) => {
      if (retryCount >= 3) {
        throw err;
      }
      return retryCount + 1;
    }, 0),
    delay(1000) // Wait 1s between retries
  ))
)

Memory Management

Subscription Cleanup

typescript
// ❌ BAD - Memory leak
export class BadComponent {
  ngOnInit() {
    this.dataService.getData().subscribe(data => {
      this.data = data;
    });
  }
}

// ✅ GOOD - Manual cleanup
export class GoodComponent implements OnDestroy {
  private subscription = new Subscription();
  
  ngOnInit() {
    this.subscription.add(
      this.dataService.getData().subscribe(data => {
        this.data = data;
      })
    );
  }
  
  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

// ✅ BETTER - takeUntil pattern
export class BetterComponent implements OnDestroy {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    this.dataService.getData().pipe(
      takeUntil(this.destroy$)
    ).subscribe(data => {
      this.data = data;
    });
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// ✅ BEST - toSignal (Angular 16+)
export class BestComponent {
  data = toSignal(
    this.dataService.getData(),
    { initialValue: [] }
  );
}

Sharing Observables

typescript
// ❌ BAD - Multiple HTTP requests
const data$ = this.http.get('/api/data');
data$.subscribe(x => console.log(x));
data$.subscribe(y => console.log(y)); // Second request!

// ✅ GOOD - Share with shareReplay
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);
data$.subscribe(x => console.log(x));
data$.subscribe(y => console.log(y)); // Uses cached result

Integration with Angular Signals

toSignal

typescript
// Convert observable to signal
export class Component {
  private dataService = inject(DataService);
  
  // Automatic subscription management
  data = toSignal(
    this.dataService.getData(),
    { initialValue: [] }
  );
  
  // Use in template
  template: `{{ data().length }} items`
}

toObservable

typescript
// Convert signal to observable
export class Component {
  searchTerm = signal('');
  
  results$ = toObservable(this.searchTerm).pipe(
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term => this.searchService.search(term))
  );
  
  results = toSignal(this.results$, { initialValue: [] });
}

Advanced Patterns

Polling

typescript
// Poll every 5 seconds
interval(5000).pipe(
  startWith(0),
  switchMap(() => this.http.get('/api/status')),
  takeUntil(this.destroy$)
).subscribe(status => {
  this.status = status;
});

Caching with Expiration

typescript
@Injectable({ providedIn: 'root' })
export class CachedDataService {
  private cache$ = new ReplaySubject<Data[]>(1);
  private cacheAge = 0;
  private readonly CACHE_DURATION = 5 * 60 * 1000; // 5 minutes
  
  getData(): Observable<Data[]> {
    const now = Date.now();
    
    if (now - this.cacheAge > this.CACHE_DURATION) {
      this.http.get<Data[]>('/api/data').subscribe(data => {
        this.cache$.next(data);
        this.cacheAge = now;
      });
    }
    
    return this.cache$.asObservable();
  }
}

Optimistic Updates

typescript
updateItem(id: string, changes: Partial<Item>): Observable<Item> {
  // Optimistically update UI
  const optimisticItem = { ...this.currentItem, ...changes };
  this.items$.next(this.items$.value.map(item => 
    item.id === id ? optimisticItem : item
  ));
  
  // Send to server
  return this.http.put<Item>(`/api/items/${id}`, changes).pipe(
    tap(serverItem => {
      // Update with server response
      this.items$.next(this.items$.value.map(item => 
        item.id === id ? serverItem : item
      ));
    }),
    catchError(error => {
      // Rollback on error
      this.items$.next(this.items$.value.map(item => 
        item.id === id ? this.currentItem : item
      ));
      return throwError(() => error);
    })
  );
}

Best Practices

  • ✅ Always unsubscribe or use takeUntil
  • ✅ Use toSignal for automatic cleanup
  • ✅ Share expensive observables with shareReplay
  • ✅ Choose the right flattening operator (switchMap, mergeMap, etc.)
  • ✅ Handle errors with catchError
  • ✅ Use async pipe in templates when possible
  • ❌ Don't subscribe in services (return observables)
  • ❌ Don't manually create subscriptions unnecessarily
  • ❌ Don't forget to complete subjects

References

Didn't find tool you were looking for?

Be as detailed as possible for better results