Capítulo 16: Snapshots y Optimización
Capítulo 16: Snapshots y Optimización
“Los snapshots aceleran la reconstrucción de agregados con historial largo”
Introducción a Snapshots
Un snapshot es una fotografía del estado de un agregado en un momento específico. En lugar de reproducir miles de eventos desde el inicio, cargamos el snapshot y solo aplicamos los eventos posteriores.
Es una técnica de optimización que sacrifica un poco de pureza (almacenamos estado derivado) por rendimiento práctico. Los snapshots son opcionales y pueden regenerarse en cualquier momento desde los eventos.
El Problema del Historial Largo
Cuando un agregado acumula miles de eventos, rehidratarlo se vuelve costoso:
Tiempo de carga sin snapshot:
- 100 eventos: ~10ms
- 1,000 eventos: ~100ms
- 10,000 eventos: ~1,000ms ⚠️
- 100,000 eventos: ~10,000ms ❌
Solución: Snapshots
Un snapshot captura el estado completo del agregado en un punto específico. Al cargar, combinamos snapshot + eventos posteriores:
graph LR
E1[E1] --> E2[E2] --> E3[E3] --> S1[Snapshot v3]
S1 --> E4[E4] --> E5[E5] --> E6[E6]
style S1 fill:#f9f,stroke:#333
Implementación TypeScript
El sistema de snapshots consta de:
- Snapshot Store: Persiste snapshots (uno por agregado, se sobrescribe)
- Serialización del estado: El agregado debe poder exportar e importar su estado
- Repositorio mejorado: Coordina lectura de snapshot + eventos adicionales
// src/infrastructure/snapshots/types.ts
export interface Snapshot<T> {
streamId: string;
version: number;
state: T;
createdAt: Date;
}
export interface SnapshotStore {
save<T>(snapshot: Snapshot<T>): Promise<void>;
get<T>(streamId: string): Promise<Snapshot<T> | null>;
delete(streamId: string): Promise<void>;
}
// src/infrastructure/snapshots/postgres-snapshot-store.ts
import { eq } from 'drizzle-orm';
import { Database } from '../database/connection';
import { snapshots } from '../database/schema';
export class PostgresSnapshotStore implements SnapshotStore {
constructor(private db: Database) {}
async save<T>(snapshot: Snapshot<T>): Promise<void> {
await this.db
.insert(snapshots)
.values({
streamId: snapshot.streamId,
version: snapshot.version,
data: snapshot.state,
createdAt: snapshot.createdAt
})
.onConflictDoUpdate({
target: snapshots.streamId,
set: {
version: snapshot.version,
data: snapshot.state,
createdAt: snapshot.createdAt
}
});
}
async get<T>(streamId: string): Promise<Snapshot<T> | null> {
const [row] = await this.db
.select()
.from(snapshots)
.where(eq(snapshots.streamId, streamId));
if (!row) return null;
return {
streamId: row.streamId,
version: row.version,
state: row.data as T,
createdAt: row.createdAt ?? new Date()
};
}
async delete(streamId: string): Promise<void> {
await this.db
.delete(snapshots)
.where(eq(snapshots.streamId, streamId));
}
}
Order con Snapshots
// src/domain/aggregates/order/order-state.ts
export interface OrderSnapshot {
id: string;
customerId: string;
customerEmail: string;
items: Array<{
productId: string;
productName: string;
sku: string;
quantity: number;
unitPrice: { amount: number; currency: string };
}>;
shippingAddress: Address | null;
billingAddress: Address | null;
status: OrderStatus;
subtotal: { amount: number; currency: string };
tax: { amount: number; currency: string };
total: { amount: number; currency: string };
paymentId: string | null;
trackingNumber: string | null;
}
// src/domain/aggregates/order/order.ts
export class Order {
// ... código anterior ...
// Crear snapshot del estado actual
toSnapshot(): OrderSnapshot {
return {
id: this.state.id,
customerId: this.state.customerId,
customerEmail: this.state.customerEmail,
items: [...this.state.items.values()].map(item => ({
productId: item.productId,
productName: item.productName,
sku: item.sku,
quantity: item.quantity,
unitPrice: item.unitPrice
})),
shippingAddress: this.state.shippingAddress,
billingAddress: this.state.billingAddress,
status: this.state.status,
subtotal: this.state.subtotal,
tax: this.state.tax,
total: this.state.total,
paymentId: this.state.paymentId,
trackingNumber: this.state.trackingNumber
};
}
// Restaurar desde snapshot
static fromSnapshot(snapshot: OrderSnapshot, version: number): Order {
const order = new Order();
order.state = {
id: snapshot.id,
customerId: snapshot.customerId,
customerEmail: snapshot.customerEmail,
items: new Map(
snapshot.items.map(item => [item.productId, item])
),
shippingAddress: snapshot.shippingAddress,
billingAddress: snapshot.billingAddress,
status: snapshot.status,
subtotal: snapshot.subtotal,
tax: snapshot.tax,
total: snapshot.total,
paymentId: snapshot.paymentId,
trackingNumber: snapshot.trackingNumber,
version
};
return order;
}
// Rehidratar con snapshot + eventos posteriores
static fromSnapshotAndEvents(
snapshot: OrderSnapshot,
snapshotVersion: number,
events: DomainEvent[]
): Order {
const order = Order.fromSnapshot(snapshot, snapshotVersion);
// Aplicar solo eventos posteriores al snapshot
events.forEach(event => order.when(event));
return order;
}
}
Repositorio con Snapshots
El repositorio decide cuándo crear snapshots y cómo combinarlos con eventos. La constante SNAPSHOT_INTERVAL define cada cuántos eventos se toma un nuevo snapshot.
La lógica de carga es:
- Buscar snapshot existente
- Si existe: cargar eventos desde
snapshot.version + 1 - Si no existe: cargar todos los eventos (comportamiento original)
// src/infrastructure/repository/order-repository.ts
export class OrderRepository {
private readonly SNAPSHOT_INTERVAL = 100; // Snapshot cada 100 eventos
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore
) {}
async save(order: Order): Promise<void> {
const uncommitted = order.getUncommittedEvents();
if (uncommitted.length === 0) return;
const streamId = `order-${order.id}`;
const expectedVersion = order.version - uncommitted.length;
await this.eventStore.append(streamId, uncommitted, expectedVersion);
// Crear snapshot si es necesario
if (order.version % this.SNAPSHOT_INTERVAL === 0) {
await this.snapshotStore.save({
streamId,
version: order.version,
state: order.toSnapshot(),
createdAt: new Date()
});
}
order.clearUncommittedEvents();
}
async getById(orderId: string): Promise<Order | null> {
const streamId = `order-${orderId}`;
// Intentar cargar snapshot
const snapshot = await this.snapshotStore.get<OrderSnapshot>(streamId);
if (snapshot) {
// Cargar solo eventos posteriores al snapshot
const events = await this.eventStore.readStream(streamId, {
fromVersion: snapshot.version + 1
});
if (events.length === 0) {
return Order.fromSnapshot(snapshot.state, snapshot.version);
}
const domainEvents = this.toDomainEvents(events);
return Order.fromSnapshotAndEvents(
snapshot.state,
snapshot.version,
domainEvents
);
}
// Sin snapshot, cargar todos los eventos
const events = await this.eventStore.readStream(streamId);
if (events.length === 0) return null;
return Order.fromEvents(this.toDomainEvents(events));
}
private toDomainEvents(stored: StoredEvent[]): DomainEvent[] {
// Implementación de deserialización
return stored.map(e => ({
eventId: e.globalPosition.toString(),
eventType: e.eventType,
aggregateId: e.streamId,
aggregateType: 'Order',
version: e.streamPosition,
payload: e.data,
metadata: e.metadata as EventMetadata
}));
}
}
Implementación Go
// internal/infrastructure/snapshots/store.go
package snapshots
import (
"context"
"encoding/json"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Snapshot struct {
StreamID string
Version int
State json.RawMessage
CreatedAt time.Time
}
type Store interface {
Save(ctx context.Context, snapshot Snapshot) error
Get(ctx context.Context, streamID string) (*Snapshot, error)
Delete(ctx context.Context, streamID string) error
}
type PostgresStore struct {
pool *pgxpool.Pool
}
func NewPostgresStore(pool *pgxpool.Pool) *PostgresStore {
return &PostgresStore{pool: pool}
}
func (s *PostgresStore) Save(ctx context.Context, snapshot Snapshot) error {
query := `
INSERT INTO snapshots (stream_id, version, data, created_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (stream_id) DO UPDATE SET
version = $2,
data = $3,
created_at = $4
`
_, err := s.pool.Exec(ctx, query,
snapshot.StreamID,
snapshot.Version,
snapshot.State,
snapshot.CreatedAt,
)
return err
}
func (s *PostgresStore) Get(ctx context.Context, streamID string) (*Snapshot, error) {
query := `
SELECT stream_id, version, data, created_at
FROM snapshots
WHERE stream_id = $1
`
var snapshot Snapshot
err := s.pool.QueryRow(ctx, query, streamID).Scan(
&snapshot.StreamID,
&snapshot.Version,
&snapshot.State,
&snapshot.CreatedAt,
)
if err != nil {
return nil, err
}
return &snapshot, nil
}
Estrategias de Snapshot
No existe una estrategia universal; depende del patrón de uso de tu aplicación:
| Estrategia | Descripción | Pros | Contras |
|---|---|---|---|
| Cada N eventos | Snapshot cada 100/1000 eventos | Simple, predecible | Puede ser innecesario |
| Por tiempo | Snapshot cada hora/día | Consistente | Puede ser muy frecuente |
| Por tamaño | Cuando el stream supera X eventos | Optimizado | Más complejo |
| Bajo demanda | Solo cuando se detecta lentitud | Eficiente | Reactivo |
Recomendación: Empieza con “cada N eventos” (100-500) y ajusta basándote en métricas reales de carga.
Métricas y Monitoreo
Monitorear la efectividad de los snapshots es crucial para ajustar la estrategia. Las métricas clave son:
- Hit rate: Porcentaje de cargas que encontraron snapshot (debe ser alto)
- Tiempo promedio de carga: Con y sin snapshot (debe reducirse significativamente)
- p95 de carga: El percentil 95 identifica casos extremos
// src/infrastructure/monitoring/snapshot-metrics.ts
export class SnapshotMetrics {
private loadTimes: number[] = [];
private snapshotHits = 0;
private snapshotMisses = 0;
recordLoad(timeMs: number, usedSnapshot: boolean): void {
this.loadTimes.push(timeMs);
if (usedSnapshot) {
this.snapshotHits++;
} else {
this.snapshotMisses++;
}
}
getStats(): {
avgLoadTime: number;
hitRate: number;
p95LoadTime: number;
} {
const sorted = [...this.loadTimes].sort((a, b) => a - b);
const total = this.snapshotHits + this.snapshotMisses;
return {
avgLoadTime: sorted.reduce((a, b) => a + b, 0) / sorted.length,
hitRate: total > 0 ? this.snapshotHits / total : 0,
p95LoadTime: sorted[Math.floor(sorted.length * 0.95)] ?? 0
};
}
}
Resumen
- Los snapshots capturan el estado completo del agregado
- Reducen dramáticamente el tiempo de carga
- Se crean según estrategias configurables
- El repositorio combina snapshot + eventos posteriores
- Es importante monitorear la efectividad de los snapshots
Glosario
Snapshot
Definición: Representación serializada del estado completo de un agregado en un punto específico de su historia (versión).
Por qué es importante: Permite reconstruir un agregado en O(k) donde k son los eventos desde el snapshot, en lugar de O(n) donde n es el historial completo.
Ejemplo práctico: Un Order con 5000 eventos tarda 500ms en cargar; con snapshot en versión 4900, carga en 10ms (snapshot + 100 eventos).
Snapshot Store
Definición: Componente que persiste y recupera snapshots, típicamente en una tabla separada o key-value store.
Por qué es importante: Separa el almacenamiento de snapshots del Event Store, permitiendo diferentes estrategias de retención y limpieza.
Ejemplo práctico: Tabla snapshots con columnas stream_id (PK), version, data (JSON), created_at. Solo guarda el último snapshot por agregado.
Rehidratación Parcial
Definición: Proceso de reconstruir un agregado aplicando solo los eventos posteriores a un snapshot existente.
Por qué es importante: Combina la velocidad de cargar estado serializado con la precisión de aplicar eventos recientes.
Ejemplo práctico: Order.fromSnapshotAndEvents(snapshot, [event4901, event4902, ...]) en lugar de Order.fromEvents([event1, ..., event5000]).
UPSERT (ON CONFLICT DO UPDATE)
Definición: Operación SQL que inserta un registro si no existe, o lo actualiza si ya existe (basado en clave única).
Por qué es importante: Permite sobrescribir el snapshot anterior de forma atómica, manteniendo solo el más reciente sin lógica adicional.
Ejemplo práctico: INSERT INTO snapshots ... ON CONFLICT (stream_id) DO UPDATE SET version = $2, data = $3 reemplaza el snapshot anterior.
Hit Rate
Definición: Porcentaje de operaciones que encontraron un recurso en cache (en este caso, un snapshot útil).
Por qué es importante: Un hit rate bajo indica que los snapshots se crean con poca frecuencia o que los agregados no se acceden repetidamente.
Ejemplo práctico: Hit rate del 95% significa que 95 de cada 100 cargas de agregados encontraron snapshot, evitando reconstrucción completa.
Percentil 95 (p95)
Definición: Valor por debajo del cual se encuentra el 95% de las observaciones. El 5% peor de los casos supera este valor.
Por qué es importante: Identifica casos extremos ignorados por el promedio. Un p95 alto de carga indica agregados problemáticos que necesitan snapshots más frecuentes.
Ejemplo práctico: Promedio de carga 50ms, p95 de 800ms significa que aunque la mayoría carga rápido, 1 de cada 20 requests es muy lenta.
Snapshot Interval
Definición: Número de eventos entre creación de snapshots sucesivos para un mismo agregado.
Por qué es importante: Balancea entre overhead de crear snapshots (costo de escritura) y beneficio de cargas rápidas (costo de lectura).
Ejemplo práctico: Interval de 100 significa que después de los eventos 100, 200, 300… se persiste un snapshot. Cargas nunca procesan más de 99 eventos extras.
← Capítulo 15: EventStoreDB Python | Capítulo 17: Versionado de Eventos →