Capítulo 4: Event Store - Conceptos
Capítulo 4: Event Store - Conceptos
“El Event Store es el corazón de un sistema Event Sourced”
¿Qué es un Event Store?
El Event Store es la base de datos donde viven todos los eventos de tu sistema. A diferencia de una base de datos tradicional donde actualizas filas, en el Event Store solo agregas nuevas filas (eventos).
Piensa en él como un libro de contabilidad: nunca borras una entrada, solo agregas nuevas. Si cometiste un error, agregas una entrada de corrección.
Un Event Store es una base de datos especializada para almacenar eventos de manera:
- Append-only: Solo se agregan eventos, nunca se modifican ni eliminan
- Ordenada: Los eventos mantienen su orden temporal exacto
- Durable: Los eventos persisten de forma confiable (no se pierden)
Conceptos Fundamentales
Streams (Flujos de Eventos)
Un stream es una secuencia ordenada de eventos que pertenecen al mismo agregado. El nombre del stream típicamente incluye el tipo de agregado y su ID, por ejemplo: order-123, customer-456.
Cada agregado tiene su propio stream. Cuando rehidratas un agregado, lees todos los eventos de su stream en orden.
Un stream es una secuencia ordenada de eventos relacionados con un agregado:
Stream: order-123
├── [0] OrderCreated { customerId: "c-1", ... }
├── [1] OrderItemAdded { productId: "p-1", quantity: 2, ... }
├── [2] OrderItemAdded { productId: "p-2", quantity: 1, ... }
├── [3] ShippingAddressSet { address: {...}, ... }
└── [4] OrderConfirmed { total: 89.97, ... }
Posición y Versión
| Concepto | Descripción |
|---|---|
| Stream Position | Posición del evento dentro de su stream (0, 1, 2…) |
| Global Position | Posición del evento en todo el store |
| Version | Versión del agregado después del evento |
Global: 1001 | Stream: order-123 | Position: 0 | OrderCreated
Global: 1002 | Stream: order-456 | Position: 0 | OrderCreated
Global: 1003 | Stream: order-123 | Position: 1 | OrderItemAdded
Global: 1004 | Stream: order-123 | Position: 2 | OrderConfirmed
Control de Concurrencia Optimista
¿Qué pasa si dos usuarios intentan modificar el mismo pedido simultáneamente? El control de concurrencia optimista resuelve esto:
- Al leer el agregado, obtienes su versión actual (ej: versión 5)
- Al guardar, indicas “espero que la versión sea 5”
- Si alguien más guardó primero (versión ahora es 6), tu escritura falla
- Debes recargar el agregado, re-aplicar tu lógica, e intentar de nuevo
Es “optimista” porque asume que los conflictos son raros y solo verifica al momento de escribir.
Evita conflictos de escritura usando la versión esperada:
// Pseudocódigo
async function appendToStream(
streamId: string,
events: Event[],
expectedVersion: number
): Promise<void> {
const currentVersion = await getStreamVersion(streamId);
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`
);
}
await appendEvents(streamId, events);
}
Modelo de Datos
Esquema SQL (PostgreSQL)
-- Tabla principal de eventos
CREATE TABLE events (
global_position BIGSERIAL PRIMARY KEY,
stream_id VARCHAR(255) NOT NULL,
stream_position INT NOT NULL,
event_type VARCHAR(255) NOT NULL,
data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(stream_id, stream_position)
);
-- Índices para consultas comunes
CREATE INDEX idx_events_stream_id ON events(stream_id);
CREATE INDEX idx_events_type ON events(event_type);
CREATE INDEX idx_events_created_at ON events(created_at);
-- Tabla de snapshots (opcional)
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
version INT NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
Esquema MongoDB
// Colección: events
{
_id: ObjectId,
globalPosition: NumberLong,
streamId: "order-123",
streamPosition: 0,
eventType: "OrderCreated",
data: {
customerId: "customer-456",
items: [...]
},
metadata: {
correlationId: "...",
causationId: "...",
userId: "..."
},
createdAt: ISODate
}
// Índices
db.events.createIndex({ streamId: 1, streamPosition: 1 }, { unique: true })
db.events.createIndex({ globalPosition: 1 })
db.events.createIndex({ eventType: 1 })
Operaciones del Event Store
Interface TypeScript
// src/infrastructure/event-store/types.ts
export interface StoredEvent {
globalPosition: bigint;
streamId: string;
streamPosition: number;
eventType: string;
data: unknown;
metadata: EventMetadata;
createdAt: Date;
}
export interface AppendResult {
nextExpectedVersion: number;
globalPosition: bigint;
}
export interface EventStore {
// Append events to a stream
append(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<AppendResult>;
// Read all events from a stream
readStream(streamId: string): Promise<StoredEvent[]>;
// Read events from a specific version
readStreamFrom(
streamId: string,
fromVersion: number
): Promise<StoredEvent[]>;
// Read all events (for projections)
readAll(fromPosition?: bigint): Promise<StoredEvent[]>;
// Subscribe to new events
subscribe(
fromPosition: bigint,
handler: (event: StoredEvent) => Promise<void>
): Subscription;
}
export interface Subscription {
unsubscribe(): void;
}
Interface Go
// infrastructure/eventstore/types.go
package eventstore
import (
"context"
"time"
)
type StoredEvent struct {
GlobalPosition int64
StreamID string
StreamPosition int
EventType string
Data []byte
Metadata []byte
CreatedAt time.Time
}
type AppendResult struct {
NextExpectedVersion int
GlobalPosition int64
}
type EventStore interface {
Append(ctx context.Context, streamID string, events []Event, expectedVersion int) (AppendResult, error)
ReadStream(ctx context.Context, streamID string) ([]StoredEvent, error)
ReadStreamFrom(ctx context.Context, streamID string, fromVersion int) ([]StoredEvent, error)
ReadAll(ctx context.Context, fromPosition int64) ([]StoredEvent, error)
Subscribe(ctx context.Context, fromPosition int64, handler func(StoredEvent) error) error
}
Interface Python
# infrastructure/event_store/types.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, AsyncIterator
@dataclass
class StoredEvent:
global_position: int
stream_id: str
stream_position: int
event_type: str
data: dict
metadata: dict
created_at: datetime
@dataclass
class AppendResult:
next_expected_version: int
global_position: int
class EventStore(ABC):
@abstractmethod
async def append(
self,
stream_id: str,
events: list,
expected_version: int
) -> AppendResult:
pass
@abstractmethod
async def read_stream(self, stream_id: str) -> list[StoredEvent]:
pass
@abstractmethod
async def read_stream_from(
self,
stream_id: str,
from_version: int
) -> list[StoredEvent]:
pass
@abstractmethod
async def read_all(
self,
from_position: int = 0
) -> AsyncIterator[StoredEvent]:
pass
Garantías y Trade-offs
Consistencia
| Tipo | Descripción | Uso |
|---|---|---|
| Strong | Lectura inmediata después de escritura | Dentro del agregado |
| Eventual | Proyecciones se actualizan async | Read models |
Ordenamiento
Garantías de orden:
✓ Orden dentro de un stream: GARANTIZADO
✓ Orden global de append: GARANTIZADO
✗ Orden de procesamiento por consumidores: NO GARANTIZADO
Idempotencia
Usa IDs de evento únicos para detectar duplicados:
async function appendIdempotent(
streamId: string,
events: DomainEvent[]
): Promise<void> {
for (const event of events) {
const exists = await this.eventExists(event.eventId);
if (exists) {
console.log(`Event ${event.eventId} already exists, skipping`);
continue;
}
await this.append(streamId, [event]);
}
}
Soluciones de Event Store
Comparativa
| Solución | Tipo | Lenguajes | Características |
|---|---|---|---|
| EventStoreDB | Dedicado | Todos (gRPC) | Proyecciones nativas, subscripciones |
| PostgreSQL | SQL | Todos | Familiar, transacciones ACID |
| MongoDB | NoSQL | Todos | Flexible, escalable |
| Kafka | Log | Todos | Alto throughput, retención |
| DynamoDB | NoSQL | AWS SDKs | Serverless, escalable |
EventStoreDB
// Conexión a EventStoreDB
import { EventStoreDBClient } from '@eventstore/db-client';
const client = EventStoreDBClient.connectionString(
'esdb://localhost:2113?tls=false'
);
// Append
await client.appendToStream(
`order-${orderId}`,
events.map(e => ({
type: e.type,
data: e.payload
})),
{ expectedRevision: BigInt(expectedVersion) }
);
// Read
const stream = client.readStream(`order-${orderId}`);
for await (const event of stream) {
console.log(event.event?.type, event.event?.data);
}
Resumen
- El Event Store es append-only e inmutable
- Los eventos se organizan en streams por agregado
- El control de concurrencia optimista previene conflictos
- La posición global permite subscripciones ordenadas
- Existen soluciones dedicadas (EventStoreDB) y genéricas (PostgreSQL)
Glosario
Stream (Flujo de Eventos)
Definicion: Secuencia ordenada de eventos que pertenecen al mismo agregado, identificada por un nombre unico como order-123.
Por que es importante: Agrupa todos los eventos de una entidad, permitiendo rehidratar el agregado leyendo un solo stream en lugar de buscar en todos los eventos.
Ejemplo practico: El stream order-abc contiene [OrderCreated, ItemAdded, ItemAdded, OrderConfirmed]. Para obtener el estado del pedido abc, lees solo este stream.
Stream Position (Posicion en Stream)
Definicion: Numero secuencial (0, 1, 2…) que indica el orden del evento dentro de su stream especifico.
Por que es importante: Garantiza el orden correcto al rehidratar. El evento en posicion 0 siempre se aplica antes que el de posicion 1.
Ejemplo practico: En el stream order-123, el OrderCreated esta en posicion 0, el primer ItemAdded en posicion 1, etc.
Global Position (Posicion Global)
Definicion: Numero secuencial unico que indica el orden del evento en todo el Event Store, atravesando todos los streams.
Por que es importante: Permite a las proyecciones procesar eventos en orden cronologico exacto, sin importar a que stream pertenezcan.
Ejemplo practico: Si OrderCreated (stream order-1) tiene global 1000 y CustomerRegistered (stream customer-1) tiene global 1001, la proyeccion los procesa en ese orden.
Control de Concurrencia Optimista
Definicion: Tecnica que detecta conflictos de escritura comparando la version esperada con la version actual al momento de guardar.
Por que es importante: Previene que dos escrituras simultaneas corrompan los datos sin necesidad de bloqueos (locks) que reducen el rendimiento.
Ejemplo practico: Lees pedido (version 5), modificas, guardas esperando version 5. Si otro usuario guardo primero (version ahora 6), recibes ConcurrencyError y debes reintentar.
Append-only
Definicion: Modelo de escritura donde solo se agregan nuevos registros; nunca se modifican ni eliminan registros existentes.
Por que es importante: Garantiza inmutabilidad de los eventos. El historial nunca cambia, lo que permite auditorias confiables y reproducibilidad.
Ejemplo practico: Si un pedido se cancela, no eliminas el evento OrderCreated; agregas un nuevo evento OrderCancelled. Ambos permanecen en el store.
Version del Agregado
Definicion: Contador que indica cuantos eventos tiene el agregado. Incrementa con cada evento. Se usa para control de concurrencia.
Por que es importante: Permite detectar si el agregado cambio entre lectura y escritura. Si la version no coincide, hubo modificacion concurrente.
Ejemplo practico: Un pedido nuevo tiene version -1 (sin eventos). Despues de OrderCreated tiene version 0. Despues de ItemAdded tiene version 1.
Subscripcion (Subscription)
Definicion: Mecanismo para recibir notificaciones de nuevos eventos a medida que se agregan al Event Store.
Por que es importante: Permite que proyecciones y otros sistemas reaccionen a eventos en tiempo real sin polling constante.
Ejemplo practico: La proyeccion de pedidos se subscribe desde posicion global 5000. Cuando llega un nuevo evento (posicion 5001), recibe notificacion automatica.
EventStoreDB
Definicion: Base de datos especializada disenada exclusivamente para Event Sourcing, con soporte nativo para streams, subscripciones y proyecciones.
Por que es importante: Ofrece caracteristicas optimizadas que serian dificiles de implementar en bases de datos genericas: proyecciones nativas, subscripciones persistentes, competencia entre consumidores.
Ejemplo practico: En lugar de implementar tu propio Event Store en PostgreSQL, usas EventStoreDB que ya tiene todo resuelto: client.appendToStream(), client.subscribeToAll().
Idempotencia
Definicion: Propiedad de una operacion que produce el mismo resultado sin importar cuantas veces se ejecute.
Por que es importante: En sistemas distribuidos, los mensajes pueden duplicarse. La idempotencia garantiza que procesar el mismo evento dos veces no corrompe los datos.
Ejemplo practico: Si la proyeccion recibe OrderCreated con eventId “abc” dos veces, la segunda vez detecta que ya proceso ese eventId y lo ignora.
← Capítulo 3: Agregados | Capítulo 5: Proyecciones y Read Models →