← Volver al listado de tecnologías

Proyecciones y Sincronización

Por: SiempreListo
cqrsproyeccioneseventossincronizacióntypescript

Capítulo 13: Proyecciones y Sincronización

Las proyecciones son uno de los componentes más importantes en un sistema CQRS. Funcionan como traductores que escuchan los eventos generados por el Write Side y los transforman en estructuras de datos optimizadas para consultas en el Read Side.

Por qué necesitamos proyecciones

En CQRS, el Write Side almacena datos en un formato optimizado para escritura (normalmente normalizado en una base de datos relacional). Sin embargo, las consultas del Read Side necesitan datos en formatos diferentes, desnormalizados y optimizados para lecturas rapidas.

Las proyecciones resuelven este problema:

  1. Escuchan eventos del Write Side en tiempo real
  2. Transforman esos eventos en estructuras de lectura
  3. Almacenan los resultados en bases de datos optimizadas (Elasticsearch, Redis, MongoDB)
  4. Mantienen la sincronización entre ambos lados

Concepto de Proyección

Una proyección implementa tres responsabilidades fundamentales:

// src/projections/types.ts
export interface Projection<T> {
  name: string;
  handle(event: DomainEvent): Promise<void>;
  rebuild(): Promise<void>;
  getPosition(): Promise<number>;
}

export interface ProjectionState {
  lastEventId: number;
  lastUpdated: Date;
  status: 'running' | 'paused' | 'rebuilding';
}

Proyección de Pedidos

Esta proyección transforma eventos de pedidos en documentos indexados en Elasticsearch. Cada método maneja un tipo de evento específico.

El patrón switch por tipo de evento es comun en proyecciones: recibimos el evento genérico y delegamos al handler específico.

// src/projections/order-summary.projection.ts
import { Client } from '@elastic/elasticsearch';

export class OrderSummaryProjection implements Projection<OrderSummary> {
  constructor(
    private elastic: Client,
    private eventStore: EventStore
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    switch (event.type) {
      case 'OrderCreated':
        await this.onOrderCreated(event);
        break;
      case 'OrderItemAdded':
        await this.onItemAdded(event);
        break;
      case 'OrderShipped':
        await this.onOrderShipped(event);
        break;
    }
  }

  private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
    await this.elastic.index({
      index: 'orders',
      id: event.aggregateId,
      document: {
        orderId: event.aggregateId,
        customerId: event.data.customerId,
        status: 'pending',
        items: [],
        total: 0,
        createdAt: event.occurredAt
      }
    });
  }

  private async onItemAdded(event: OrderItemAddedEvent): Promise<void> {
    await this.elastic.update({
      index: 'orders',
      id: event.aggregateId,
      script: {
        source: `
          ctx._source.items.add(params.item);
          ctx._source.total += params.item.price * params.item.quantity
        `,
        params: { item: event.data.item }
      }
    });
  }

  private async onOrderShipped(event: OrderShippedEvent): Promise<void> {
    await this.elastic.update({
      index: 'orders',
      id: event.aggregateId,
      doc: {
        status: 'shipped',
        shippedAt: event.occurredAt
      }
    });
  }
}

Gestor de Proyecciones

El ProjectionManager coordina múltiples proyecciones, asegurando que cada una:

// src/projections/projection-manager.ts
export class ProjectionManager {
  private projections: Map<string, Projection<unknown>> = new Map();
  private positions: Map<string, number> = new Map();

  register(projection: Projection<unknown>): void {
    this.projections.set(projection.name, projection);
  }

  async start(): Promise<void> {
    for (const [name, projection] of this.projections) {
      const position = await projection.getPosition();
      this.positions.set(name, position);
      this.subscribeToEvents(name, projection, position);
    }
  }

  private subscribeToEvents(
    name: string,
    projection: Projection<unknown>,
    fromPosition: number
  ): void {
    this.eventStore.subscribe(async (event, position) => {
      if (position > fromPosition) {
        await projection.handle(event);
        this.positions.set(name, position);
        await this.savePosition(name, position);
      }
    });
  }

  async rebuild(projectionName: string): Promise<void> {
    const projection = this.projections.get(projectionName);
    if (!projection) throw new Error(`Projection ${projectionName} not found`);

    await projection.rebuild();
  }
}

Reconstrucción de Proyecciones

La reconstrucción (rebuild) permite regenerar completamente un Read Model desde el historial de eventos. Esto es util cuando:

El procesamiento por lotes (batch) evita sobrecargar la memoria al procesar millones de eventos.

// src/projections/rebuilder.ts
export class ProjectionRebuilder {
  constructor(
    private eventStore: EventStore,
    private batchSize: number = 1000
  ) {}

  async rebuild(projection: Projection<unknown>): Promise<void> {
    let position = 0;
    let hasMore = true;

    while (hasMore) {
      const events = await this.eventStore.getEvents(position, this.batchSize);

      for (const event of events) {
        await projection.handle(event);
        position = event.position;
      }

      hasMore = events.length === this.batchSize;
    }
  }
}

Sincronización con Múltiples Read Models

Una sola proyección puede actualizar múltiples bases de datos simultáneamente. Esto es util cuando diferentes consultas requieren diferentes tecnologías de almacenamiento.

Promise.all ejecuta las actualizaciones en paralelo para mejor rendimiento.

// src/projections/multi-store.projection.ts
export class OrderMultiStoreProjection {
  constructor(
    private elastic: Client,
    private redis: Redis,
    private mongo: MongoClient
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    await Promise.all([
      this.updateElastic(event),
      this.updateRedisCache(event),
      this.updateMongoReports(event)
    ]);
  }

  private async updateRedisCache(event: DomainEvent): Promise<void> {
    if (event.type === 'OrderCreated') {
      await this.redis.setex(
        `order:${event.aggregateId}`,
        3600,
        JSON.stringify(event.data)
      );
    }
  }
}

Manejo de Errores en Proyecciones

Las proyecciones deben ser resilientes a fallos. El patrón Dead Letter Queue (DLQ) almacena eventos que fallan repetidamente para análisis posterior, evitando que bloqueen el procesamiento.

RetryPolicy define cuántas veces reintentar y con qué intervalos antes de enviar a la DLQ.

// src/projections/resilient-projection.ts
export class ResilientProjection<T> implements Projection<T> {
  constructor(
    private inner: Projection<T>,
    private retryPolicy: RetryPolicy,
    private deadLetterQueue: DeadLetterQueue
  ) {}

  async handle(event: DomainEvent): Promise<void> {
    try {
      await this.retryPolicy.execute(() => this.inner.handle(event));
    } catch (error) {
      await this.deadLetterQueue.push({
        event,
        projection: this.name,
        error: error.message,
        timestamp: new Date()
      });
    }
  }
}

Resumen

Las proyecciones permiten:

Glosario

Proyección

Definición: Componente que escucha eventos del Write Side y los transforma en estructuras de datos optimizadas para el Read Side.

Por qué es importante: Permite que el Read Side tenga datos desnormalizados y optimizados para consultas rápidas, sin afectar la integridad del Write Side.

Ejemplo práctico: Una proyección que escucha eventos OrderCreated e ItemAdded para mantener un documento de resumen de pedido en Elasticsearch con todos los datos precalculados.


Position (Posición)

Definición: Número secuencial que identifica hasta qué evento ha procesado una proyección en el stream de eventos.

Por qué es importante: Permite que una proyección se recupere de reinicios sin perder eventos ni procesarlos dos veces. Es el “marcador” de progreso.

Ejemplo práctico: Si una proyección tiene position=1000, al reiniciarse solo procesará eventos desde el 1001 en adelante.


Rebuild (Reconstrucción)

Definición: Proceso de regenerar completamente un Read Model leyendo todos los eventos desde el inicio del historial.

Por qué es importante: Permite corregir errores, migrar datos o agregar nuevas proyecciones sin perder información, ya que los eventos son la fuente de verdad.

Ejemplo práctico: Al agregar un nuevo campo “totalWithTax” al Read Model, ejecutas rebuild para calcularlo en todos los pedidos históricos.


Dead Letter Queue (DLQ)

Definición: Cola especial donde se almacenan eventos que fallaron repetidamente al ser procesados por una proyección.

Por qué es importante: Evita que eventos problemáticos bloqueen el procesamiento de eventos posteriores. Permite análisis y reprocesamiento manual.

Ejemplo práctico: Un evento con datos corruptos falla 3 veces y se mueve a la DLQ, permitiendo que los siguientes eventos se procesen normalmente.


Batch Processing (Procesamiento por Lotes)

Definición: Técnica que procesa eventos en grupos de tamaño fijo en lugar de uno por uno.

Por qué es importante: Optimiza el uso de memoria y recursos al procesar grandes volúmenes de eventos durante reconstrucciones.

Ejemplo práctico: Procesar 1000 eventos, persistir progreso, cargar los siguientes 1000, en lugar de cargar millones de eventos en memoria.


← Capítulo 12: Elasticsearch | Capítulo 14: Caché con Redis →