Inicio / Angular / Angular Completo: De Cero a Experto / RxJS y Programación Reactiva

RxJS y Programación Reactiva

Aprende Observables, Subjects, operadores pipe (map, switchMap, combineLatest, forkJoin) y cómo integrar RxJS en componentes Angular.


RxJS y Programación Reactiva

RxJS (Reactive Extensions for JavaScript) es la librería de programación reactiva que Angular usa internamente en todas partes: HttpClient, Router, Formularios Reactivos, y más. Entender RxJS es esencial para ser un desarrollador Angular eficiente.


Conceptos fundamentales

Observable

Un Observable es como un flujo de datos en el tiempo. Puede emitir:

  • Cero o más valores en cualquier momento
  • Un error (y se termina)
  • Una señal de completado
Timeline: ----1----2----3----4---|  (completado)
Timeline: ----a----b----X       (error en X)

Observer

Un Observer es el que consume los valores del Observable:

import { Observable, Observer } from 'rxjs';

const observable$ = new Observable<number>((subscriber) => {
  subscriber.next(1);         // Emite valor
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();      // Señal de completado
  // subscriber.error(new Error('algo falló'));  // O un error
});

// Observar con un observer completo
const observer: Observer<number> = {
  next: (valor) => console.log('Valor:', valor),
  error: (err) => console.error('Error:', err),
  complete: () => console.log('¡Completado!')
};

const suscripcion = observable$.subscribe(observer);

// Cancelar la suscripción (importante para evitar memory leaks)
suscripcion.unsubscribe();

Subscription

import { interval } from 'rxjs';

// Emite 0, 1, 2, 3... cada segundo
const contador$ = interval(1000);

const sub = contador$.subscribe(n => console.log(n));

// Cancelar después de 5 segundos
setTimeout(() => {
  sub.unsubscribe();
  console.log('Cancelado');
}, 5000);

Creación de Observables

import {
  of, from, interval, timer, fromEvent,
  Subject, BehaviorSubject, ReplaySubject, EMPTY, NEVER
} from 'rxjs';

// of — Valores estáticos
const numeros$ = of(1, 2, 3);          // Emite 1, 2, 3 y completa

// from — Promesas, arrays, iterables
const array$ = from([10, 20, 30]);
const promesa$ = from(fetch('/api/datos').then(r => r.json()));

// interval — Temporizador periódico
const cada2segundos$ = interval(2000);  // 0, 1, 2... cada 2s

// timer — Retardo inicial + intervalo
const timer$ = timer(1000, 500);        // Empieza en 1s, luego cada 0.5s
const retardo$ = timer(3000);           // Emite 0 una vez después de 3s

// fromEvent — Eventos del DOM
const clicks$ = fromEvent(document, 'click');
const teclado$ = fromEvent<KeyboardEvent>(document, 'keydown');

// EMPTY — Completa inmediatamente sin emitir
EMPTY.subscribe({ complete: () => console.log('Vacío, completado') });

// NEVER — Nunca emite ni completa
NEVER.subscribe();  // infinito sin hacer nada

Subjects — Observables multidifusión

Los Subjects son Observables que también pueden emitir valores manualmente:

import { Subject, BehaviorSubject, ReplaySubject } from 'rxjs';

// Subject básico — solo reciben valores futuros
const eventos$ = new Subject<string>();

eventos$.subscribe(e => console.log('Suscriptor 1:', e));
eventos$.next('primer evento');    // Suscriptor 1: primer evento

eventos$.subscribe(e => console.log('Suscriptor 2:', e));
eventos$.next('segundo evento');   // Suscriptor 1 Y 2 reciben este
eventos$.complete();

// BehaviorSubject — Guarda el último valor (ideal para estado)
const usuario$ = new BehaviorSubject<string | null>(null);

// Un nuevo suscriptor recibe inmediatamente el valor actual
usuario$.subscribe(u => console.log('Usuario:', u));  // Usuario: null

usuario$.next('Ana');  // Usuario: Ana

// El valor actual siempre está disponible de forma sincrónica
console.log(usuario$.getValue());  // 'Ana'

// ReplaySubject — Guarda los N últimos valores
const historial$ = new ReplaySubject<string>(3);  // Guarda últimos 3

historial$.next('primero');
historial$.next('segundo');
historial$.next('tercero');
historial$.next('cuarto');

// Un nuevo suscriptor recibe los 3 últimos: 'segundo', 'tercero', 'cuarto'
historial$.subscribe(v => console.log(v));

Operadores pipe

Los operadores transforman, filtran y combinan Observables:

Transformación

import { of, from } from 'rxjs';
import { map, flatMap, switchMap, concatMap, mergeMap, scan, reduce } from 'rxjs/operators';

// map — Transforma cada valor
of(1, 2, 3).pipe(
  map(n => n * 2)
).subscribe(console.log);  // 2, 4, 6

// Transformar objetos
from(usuarios).pipe(
  map(u => ({ ...u, nombreCompleto: `${u.nombre} ${u.apellido}` }))
);

// switchMap — Cancela el Observable anterior (ideal para búsqueda en tiempo real)
busqueda$.pipe(
  debounceTime(300),
  distinctUntilChanged(),
  switchMap(termino => this.apiService.buscar(termino))
).subscribe(resultados => this.resultados = resultados);

// mergeMap — Ejecuta todos en paralelo
ids$.pipe(
  mergeMap(id => this.http.get(`/api/items/${id}`))
).subscribe(item => this.items.push(item));

// concatMap — Ejecuta en secuencia (espera a que termine el anterior)
solicitudes$.pipe(
  concatMap(solicitud => this.http.post('/api/procesar', solicitud))
).subscribe(console.log);

// exhaustMap — Ignora nuevas emisiones mientras el Observable interno está activo
// Ideal para prevenir doble envío de formularios o clicks repetidos
botonEnviar$.pipe(
  exhaustMap(() => this.http.post('/api/formulario', datos))
).subscribe(respuesta => console.log('Enviado:', respuesta));
// Si el usuario hace click varias veces mientras procesa, los clicks extra se IGNORAN

// Comparativa rápida:
// switchMap   → Cancela el anterior, emite el nuevo (búsqueda en tiempo real)
// mergeMap    → Paralelo, todos corren a la vez (cargar múltiples IDs)
// concatMap   → Secuencial, espera al anterior (procesar cola en orden)
// exhaustMap  → Ignora nuevos mientras hay uno activo (submit de formulario)

// scan — Acumulador (como reduce pero emite en cada paso)
of(1, 2, 3, 4, 5).pipe(
  scan((acum, valor) => acum + valor, 0)
).subscribe(console.log);  // 1, 3, 6, 10, 15

Filtrado

import { debounceTime, distinctUntilChanged, filter, take, takeUntil, skip, first } from 'rxjs/operators';

// filter — Solo pasa los valores que cumplen la condición
of(1, 2, 3, 4, 5, 6).pipe(
  filter(n => n % 2 === 0)
).subscribe(console.log);  // 2, 4, 6

// debounceTime — Espera X ms sin nuevos valores antes de emitir
input$.pipe(debounceTime(400));  // Espera 400ms después de la última tecla

// distinctUntilChanged — Solo emite si el valor cambió
of('a', 'a', 'b', 'b', 'c').pipe(
  distinctUntilChanged()
).subscribe(console.log);  // a, b, c

// take — Solo toma N valores y completa
interval(1000).pipe(take(5)).subscribe(console.log);  // 0, 1, 2, 3, 4

// takeUntil — Toma valores hasta que otro Observable emite
const parar$ = new Subject<void>();
interval(500).pipe(
  takeUntil(parar$)
).subscribe(console.log);

setTimeout(() => parar$.next(), 2000);  // Para a los 2 segundos

// first — Solo el primer valor (o el primero que cumple condición)
clicks$.pipe(first()).subscribe(console.log);  // Solo el primer click
clicks$.pipe(first(e => e.button === 2)).subscribe(console.log);  // Primer click derecho

Combinación

import { combineLatest, forkJoin, merge, zip, withLatestFrom } from 'rxjs';

// forkJoin — Como Promise.all: espera a que todos completen
forkJoin([
  this.http.get('/api/usuarios'),
  this.http.get('/api/productos'),
  this.http.get('/api/categorias')
]).subscribe(([usuarios, productos, categorias]) => {
  // Los tres han cargado
  this.inicializar(usuarios, productos, categorias);
});

// combineLatest — Emite cuando cualquiera emite (con el último valor de todos)
combineLatest([precio$, cantidad$]).pipe(
  map(([precio, cantidad]) => precio * cantidad)
).subscribe(total => this.total = total);

// merge — Une varios Observables, emite cuando cualquiera emite
merge(click$, teclado$).subscribe(evento => manejar(evento));

// zip — Emite cuando todos han emitido (pares 1-1)
zip(nombres$, edades$).pipe(
  map(([nombre, edad]) => ({ nombre, edad }))
);

// withLatestFrom — Combina con el último valor de otro Observable
boton$.pipe(
  withLatestFrom(formulario$)
).subscribe(([_click, datos]) => enviar(datos));

Manejo de errores

import { catchError, retry, retryWhen, throwError, EMPTY } from 'rxjs';
import { delay } from 'rxjs/operators';

// catchError — Captura el error y puede retornar un valor por defecto
this.http.get('/api/datos').pipe(
  catchError(error => {
    console.error('Error:', error);
    return of([]);  // Retorna array vacío en caso de error
    // return throwError(() => error);  // Re-lanza el error
    // return EMPTY;  // Completa sin emitir nada
  })
);

// retry — Reintenta N veces
this.http.get('/api/datos').pipe(
  retry(3)  // Reintenta hasta 3 veces antes de propagar el error
);

// retryWhen — Reintento con lógica personalizada
this.http.get('/api/datos').pipe(
  retryWhen(errors =>
    errors.pipe(
      delayWhen((_, intento) => timer(Math.pow(2, intento) * 1000))  // Backoff exponencial
    )
  )
);

RxJS en componentes Angular

Patrón: Gestionar suscripciones con takeUntilDestroyed

import { Component, OnInit, inject } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

@Component({ selector: 'app-demo', standalone: true, template: '' })
export class DemoComponent implements OnInit {
  private postsService = inject(PostsService);

  // takeUntilDestroyed cancela automáticamente cuando el componente se destruye
  ngOnInit(): void {
    this.postsService.obtenerTodos().pipe(
      takeUntilDestroyed()   // ← No necesitas ngOnDestroy
    ).subscribe(posts => this.posts = posts);
  }
}

Patrón: Búsqueda en tiempo real

import { Component, OnInit, inject } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { debounceTime, distinctUntilChanged, switchMap, startWith } from 'rxjs/operators';
import { AsyncPipe } from '@angular/common';
import { Observable } from 'rxjs';

@Component({
  selector: 'app-buscador',
  standalone: true,
  imports: [ReactiveFormsModule, AsyncPipe],
  template: `
    <input [formControl]="campoBusqueda" placeholder="Buscar...">

    @if (resultados$ | async; as resultados) {
      @for (r of resultados; track r.id) {
        <div>{{ r.titulo }}</div>
      }
    }
  `
})
export class BuscadorComponent implements OnInit {
  private apiService = inject(ApiService);
  campoBusqueda = new FormControl('');
  resultados$!: Observable<any[]>;

  ngOnInit(): void {
    this.resultados$ = this.campoBusqueda.valueChanges.pipe(
      startWith(''),                        // Emite al iniciar con ''
      debounceTime(350),                    // Espera 350ms sin teclear
      distinctUntilChanged(),               // Solo si el valor cambió
      switchMap(termino =>                  // Cancela búsquedas anteriores
        this.apiService.buscar(termino ?? '')
      )
    );
  }
}

Signals vs RxJS

Angular 16+ introdujo Signals como alternativa más simple a RxJS para estado reactivo local:

import { Component, signal, computed, effect } from '@angular/core';

@Component({
  selector: 'app-contador',
  standalone: true,
  template: `
    <p>Contador: {{ contador() }}</p>
    <p>Doble: {{ doble() }}</p>
    <button (click)="incrementar()">+1</button>
  `
})
export class ContadorComponent {
  // Signal — valor reactivo
  contador = signal(0);

  // Computed signal — se recalcula automáticamente
  doble = computed(() => this.contador() * 2);

  constructor() {
    // Effect — se ejecuta cuando cambia una dependencia
    effect(() => {
      console.log(`El contador cambió a: ${this.contador()}`);
    });
  }

  incrementar(): void {
    this.contador.update(n => n + 1);     // Basado en valor anterior
    // this.contador.set(10);              // Establece valor directamente
  }
}

Interoperabilidad Signals ↔ RxJS

import { toObservable, toSignal } from '@angular/core/rxjs-interop';

// Signal → Observable
const contador = signal(0);
const contador$ = toObservable(contador);

// Observable → Signal
const datos$ = this.http.get<Data[]>('/api/datos');
const datos = toSignal(datos$, { initialValue: [] });
// Ahora puedes usar datos() en el template en lugar de async pipe

Resumen de operadores más usados

Operador Categoría Descripción
map Transformación Transforma cada valor
switchMap Transformación Cancela el anterior, ideal para búsquedas
mergeMap Transformación Paralelo, todos a la vez
concatMap Transformación Secuencial, respeta el orden
exhaustMap Transformación Ignora nuevos mientras hay uno activo (submit)
filter Filtrado Solo pasa lo que cumple la condición
debounceTime Filtrado Espera silencio antes de emitir
distinctUntilChanged Filtrado Solo si cambia el valor
take / takeUntil Filtrado Limitar emisiones
catchError Errores Manejar errores
retry Errores Reintentar
forkJoin Combinación Esperar varios (Promise.all)
combineLatest Combinación Último de todos
tap Utilidad Efectos secundarios sin modificar
startWith Utilidad Emite un valor inicial
shareReplay Multidifusión Comparte y recuerda la última emisión