← Volver al listado de tecnologías

Capítulo 16: Snapshots y Optimización

Por: SiempreListo
event-sourcingsnapshotsoptimizaciónrendimiento

Capítulo 16: Snapshots y Optimización

“Los snapshots aceleran la reconstrucción de agregados con historial largo”

Introducción a Snapshots

Un snapshot es una fotografía del estado de un agregado en un momento específico. En lugar de reproducir miles de eventos desde el inicio, cargamos el snapshot y solo aplicamos los eventos posteriores.

Es una técnica de optimización que sacrifica un poco de pureza (almacenamos estado derivado) por rendimiento práctico. Los snapshots son opcionales y pueden regenerarse en cualquier momento desde los eventos.

El Problema del Historial Largo

Cuando un agregado acumula miles de eventos, rehidratarlo se vuelve costoso:

Tiempo de carga sin snapshot:
- 100 eventos:   ~10ms
- 1,000 eventos: ~100ms
- 10,000 eventos: ~1,000ms  ⚠️
- 100,000 eventos: ~10,000ms ❌

Solución: Snapshots

Un snapshot captura el estado completo del agregado en un punto específico. Al cargar, combinamos snapshot + eventos posteriores:

graph LR
    E1[E1] --> E2[E2] --> E3[E3] --> S1[Snapshot v3]
    S1 --> E4[E4] --> E5[E5] --> E6[E6]

    style S1 fill:#f9f,stroke:#333

Implementación TypeScript

El sistema de snapshots consta de:

  1. Snapshot Store: Persiste snapshots (uno por agregado, se sobrescribe)
  2. Serialización del estado: El agregado debe poder exportar e importar su estado
  3. Repositorio mejorado: Coordina lectura de snapshot + eventos adicionales
// src/infrastructure/snapshots/types.ts
export interface Snapshot<T> {
  streamId: string;
  version: number;
  state: T;
  createdAt: Date;
}

export interface SnapshotStore {
  save<T>(snapshot: Snapshot<T>): Promise<void>;
  get<T>(streamId: string): Promise<Snapshot<T> | null>;
  delete(streamId: string): Promise<void>;
}

// src/infrastructure/snapshots/postgres-snapshot-store.ts
import { eq } from 'drizzle-orm';
import { Database } from '../database/connection';
import { snapshots } from '../database/schema';

export class PostgresSnapshotStore implements SnapshotStore {
  constructor(private db: Database) {}

  async save<T>(snapshot: Snapshot<T>): Promise<void> {
    await this.db
      .insert(snapshots)
      .values({
        streamId: snapshot.streamId,
        version: snapshot.version,
        data: snapshot.state,
        createdAt: snapshot.createdAt
      })
      .onConflictDoUpdate({
        target: snapshots.streamId,
        set: {
          version: snapshot.version,
          data: snapshot.state,
          createdAt: snapshot.createdAt
        }
      });
  }

  async get<T>(streamId: string): Promise<Snapshot<T> | null> {
    const [row] = await this.db
      .select()
      .from(snapshots)
      .where(eq(snapshots.streamId, streamId));

    if (!row) return null;

    return {
      streamId: row.streamId,
      version: row.version,
      state: row.data as T,
      createdAt: row.createdAt ?? new Date()
    };
  }

  async delete(streamId: string): Promise<void> {
    await this.db
      .delete(snapshots)
      .where(eq(snapshots.streamId, streamId));
  }
}

Order con Snapshots

// src/domain/aggregates/order/order-state.ts
export interface OrderSnapshot {
  id: string;
  customerId: string;
  customerEmail: string;
  items: Array<{
    productId: string;
    productName: string;
    sku: string;
    quantity: number;
    unitPrice: { amount: number; currency: string };
  }>;
  shippingAddress: Address | null;
  billingAddress: Address | null;
  status: OrderStatus;
  subtotal: { amount: number; currency: string };
  tax: { amount: number; currency: string };
  total: { amount: number; currency: string };
  paymentId: string | null;
  trackingNumber: string | null;
}

// src/domain/aggregates/order/order.ts
export class Order {
  // ... código anterior ...

  // Crear snapshot del estado actual
  toSnapshot(): OrderSnapshot {
    return {
      id: this.state.id,
      customerId: this.state.customerId,
      customerEmail: this.state.customerEmail,
      items: [...this.state.items.values()].map(item => ({
        productId: item.productId,
        productName: item.productName,
        sku: item.sku,
        quantity: item.quantity,
        unitPrice: item.unitPrice
      })),
      shippingAddress: this.state.shippingAddress,
      billingAddress: this.state.billingAddress,
      status: this.state.status,
      subtotal: this.state.subtotal,
      tax: this.state.tax,
      total: this.state.total,
      paymentId: this.state.paymentId,
      trackingNumber: this.state.trackingNumber
    };
  }

  // Restaurar desde snapshot
  static fromSnapshot(snapshot: OrderSnapshot, version: number): Order {
    const order = new Order();

    order.state = {
      id: snapshot.id,
      customerId: snapshot.customerId,
      customerEmail: snapshot.customerEmail,
      items: new Map(
        snapshot.items.map(item => [item.productId, item])
      ),
      shippingAddress: snapshot.shippingAddress,
      billingAddress: snapshot.billingAddress,
      status: snapshot.status,
      subtotal: snapshot.subtotal,
      tax: snapshot.tax,
      total: snapshot.total,
      paymentId: snapshot.paymentId,
      trackingNumber: snapshot.trackingNumber,
      version
    };

    return order;
  }

  // Rehidratar con snapshot + eventos posteriores
  static fromSnapshotAndEvents(
    snapshot: OrderSnapshot,
    snapshotVersion: number,
    events: DomainEvent[]
  ): Order {
    const order = Order.fromSnapshot(snapshot, snapshotVersion);

    // Aplicar solo eventos posteriores al snapshot
    events.forEach(event => order.when(event));

    return order;
  }
}

Repositorio con Snapshots

El repositorio decide cuándo crear snapshots y cómo combinarlos con eventos. La constante SNAPSHOT_INTERVAL define cada cuántos eventos se toma un nuevo snapshot.

La lógica de carga es:

  1. Buscar snapshot existente
  2. Si existe: cargar eventos desde snapshot.version + 1
  3. Si no existe: cargar todos los eventos (comportamiento original)
// src/infrastructure/repository/order-repository.ts
export class OrderRepository {
  private readonly SNAPSHOT_INTERVAL = 100; // Snapshot cada 100 eventos

  constructor(
    private eventStore: EventStore,
    private snapshotStore: SnapshotStore
  ) {}

  async save(order: Order): Promise<void> {
    const uncommitted = order.getUncommittedEvents();
    if (uncommitted.length === 0) return;

    const streamId = `order-${order.id}`;
    const expectedVersion = order.version - uncommitted.length;

    await this.eventStore.append(streamId, uncommitted, expectedVersion);

    // Crear snapshot si es necesario
    if (order.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotStore.save({
        streamId,
        version: order.version,
        state: order.toSnapshot(),
        createdAt: new Date()
      });
    }

    order.clearUncommittedEvents();
  }

  async getById(orderId: string): Promise<Order | null> {
    const streamId = `order-${orderId}`;

    // Intentar cargar snapshot
    const snapshot = await this.snapshotStore.get<OrderSnapshot>(streamId);

    if (snapshot) {
      // Cargar solo eventos posteriores al snapshot
      const events = await this.eventStore.readStream(streamId, {
        fromVersion: snapshot.version + 1
      });

      if (events.length === 0) {
        return Order.fromSnapshot(snapshot.state, snapshot.version);
      }

      const domainEvents = this.toDomainEvents(events);
      return Order.fromSnapshotAndEvents(
        snapshot.state,
        snapshot.version,
        domainEvents
      );
    }

    // Sin snapshot, cargar todos los eventos
    const events = await this.eventStore.readStream(streamId);
    if (events.length === 0) return null;

    return Order.fromEvents(this.toDomainEvents(events));
  }

  private toDomainEvents(stored: StoredEvent[]): DomainEvent[] {
    // Implementación de deserialización
    return stored.map(e => ({
      eventId: e.globalPosition.toString(),
      eventType: e.eventType,
      aggregateId: e.streamId,
      aggregateType: 'Order',
      version: e.streamPosition,
      payload: e.data,
      metadata: e.metadata as EventMetadata
    }));
  }
}

Implementación Go

// internal/infrastructure/snapshots/store.go
package snapshots

import (
    "context"
    "encoding/json"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
)

type Snapshot struct {
    StreamID  string
    Version   int
    State     json.RawMessage
    CreatedAt time.Time
}

type Store interface {
    Save(ctx context.Context, snapshot Snapshot) error
    Get(ctx context.Context, streamID string) (*Snapshot, error)
    Delete(ctx context.Context, streamID string) error
}

type PostgresStore struct {
    pool *pgxpool.Pool
}

func NewPostgresStore(pool *pgxpool.Pool) *PostgresStore {
    return &PostgresStore{pool: pool}
}

func (s *PostgresStore) Save(ctx context.Context, snapshot Snapshot) error {
    query := `
        INSERT INTO snapshots (stream_id, version, data, created_at)
        VALUES ($1, $2, $3, $4)
        ON CONFLICT (stream_id) DO UPDATE SET
            version = $2,
            data = $3,
            created_at = $4
    `

    _, err := s.pool.Exec(ctx, query,
        snapshot.StreamID,
        snapshot.Version,
        snapshot.State,
        snapshot.CreatedAt,
    )

    return err
}

func (s *PostgresStore) Get(ctx context.Context, streamID string) (*Snapshot, error) {
    query := `
        SELECT stream_id, version, data, created_at
        FROM snapshots
        WHERE stream_id = $1
    `

    var snapshot Snapshot
    err := s.pool.QueryRow(ctx, query, streamID).Scan(
        &snapshot.StreamID,
        &snapshot.Version,
        &snapshot.State,
        &snapshot.CreatedAt,
    )

    if err != nil {
        return nil, err
    }

    return &snapshot, nil
}

Estrategias de Snapshot

No existe una estrategia universal; depende del patrón de uso de tu aplicación:

EstrategiaDescripciónProsContras
Cada N eventosSnapshot cada 100/1000 eventosSimple, predeciblePuede ser innecesario
Por tiempoSnapshot cada hora/díaConsistentePuede ser muy frecuente
Por tamañoCuando el stream supera X eventosOptimizadoMás complejo
Bajo demandaSolo cuando se detecta lentitudEficienteReactivo

Recomendación: Empieza con “cada N eventos” (100-500) y ajusta basándote en métricas reales de carga.

Métricas y Monitoreo

Monitorear la efectividad de los snapshots es crucial para ajustar la estrategia. Las métricas clave son:

// src/infrastructure/monitoring/snapshot-metrics.ts
export class SnapshotMetrics {
  private loadTimes: number[] = [];
  private snapshotHits = 0;
  private snapshotMisses = 0;

  recordLoad(timeMs: number, usedSnapshot: boolean): void {
    this.loadTimes.push(timeMs);
    if (usedSnapshot) {
      this.snapshotHits++;
    } else {
      this.snapshotMisses++;
    }
  }

  getStats(): {
    avgLoadTime: number;
    hitRate: number;
    p95LoadTime: number;
  } {
    const sorted = [...this.loadTimes].sort((a, b) => a - b);
    const total = this.snapshotHits + this.snapshotMisses;

    return {
      avgLoadTime: sorted.reduce((a, b) => a + b, 0) / sorted.length,
      hitRate: total > 0 ? this.snapshotHits / total : 0,
      p95LoadTime: sorted[Math.floor(sorted.length * 0.95)] ?? 0
    };
  }
}

Resumen

Glosario

Snapshot

Definición: Representación serializada del estado completo de un agregado en un punto específico de su historia (versión).

Por qué es importante: Permite reconstruir un agregado en O(k) donde k son los eventos desde el snapshot, en lugar de O(n) donde n es el historial completo.

Ejemplo práctico: Un Order con 5000 eventos tarda 500ms en cargar; con snapshot en versión 4900, carga en 10ms (snapshot + 100 eventos).


Snapshot Store

Definición: Componente que persiste y recupera snapshots, típicamente en una tabla separada o key-value store.

Por qué es importante: Separa el almacenamiento de snapshots del Event Store, permitiendo diferentes estrategias de retención y limpieza.

Ejemplo práctico: Tabla snapshots con columnas stream_id (PK), version, data (JSON), created_at. Solo guarda el último snapshot por agregado.


Rehidratación Parcial

Definición: Proceso de reconstruir un agregado aplicando solo los eventos posteriores a un snapshot existente.

Por qué es importante: Combina la velocidad de cargar estado serializado con la precisión de aplicar eventos recientes.

Ejemplo práctico: Order.fromSnapshotAndEvents(snapshot, [event4901, event4902, ...]) en lugar de Order.fromEvents([event1, ..., event5000]).


UPSERT (ON CONFLICT DO UPDATE)

Definición: Operación SQL que inserta un registro si no existe, o lo actualiza si ya existe (basado en clave única).

Por qué es importante: Permite sobrescribir el snapshot anterior de forma atómica, manteniendo solo el más reciente sin lógica adicional.

Ejemplo práctico: INSERT INTO snapshots ... ON CONFLICT (stream_id) DO UPDATE SET version = $2, data = $3 reemplaza el snapshot anterior.


Hit Rate

Definición: Porcentaje de operaciones que encontraron un recurso en cache (en este caso, un snapshot útil).

Por qué es importante: Un hit rate bajo indica que los snapshots se crean con poca frecuencia o que los agregados no se acceden repetidamente.

Ejemplo práctico: Hit rate del 95% significa que 95 de cada 100 cargas de agregados encontraron snapshot, evitando reconstrucción completa.


Percentil 95 (p95)

Definición: Valor por debajo del cual se encuentra el 95% de las observaciones. El 5% peor de los casos supera este valor.

Por qué es importante: Identifica casos extremos ignorados por el promedio. Un p95 alto de carga indica agregados problemáticos que necesitan snapshots más frecuentes.

Ejemplo práctico: Promedio de carga 50ms, p95 de 800ms significa que aunque la mayoría carga rápido, 1 de cada 20 requests es muy lenta.


Snapshot Interval

Definición: Número de eventos entre creación de snapshots sucesivos para un mismo agregado.

Por qué es importante: Balancea entre overhead de crear snapshots (costo de escritura) y beneficio de cargas rápidas (costo de lectura).

Ejemplo práctico: Interval de 100 significa que después de los eventos 100, 200, 300… se persiste un snapshot. Cargas nunca procesan más de 99 eventos extras.


← Capítulo 15: EventStoreDB Python | Capítulo 17: Versionado de Eventos →