Capítulo 5: Proyecciones y Read Models
Capítulo 5: Proyecciones y Read Models
“Los eventos cuentan la historia, las proyecciones responden preguntas”
¿Qué son las Proyecciones?
Imagina que tienes 1 millón de eventos de pedidos. Si un usuario pregunta “¿Cuáles son mis pedidos pendientes?”, no querrás recorrer todos los eventos cada vez.
Una proyección es un proceso que escucha eventos y construye estructuras de datos optimizadas para responder preguntas específicas. Es como crear un índice o una vista materializada, pero con control total sobre su estructura.
El resultado de una proyección se llama Read Model (modelo de lectura): una tabla, documento, o estructura de datos diseñada para responder una consulta específica de forma rápida.
Una proyección transforma eventos en una vista optimizada para consultas específicas:
graph LR
E1[OrderCreated] --> P[Proyección]
E2[OrderItemAdded] --> P
E3[OrderConfirmed] --> P
P --> RM[Read Model]
RM --> Q[Queries]
Tipos de Proyecciones
Existen diferentes estrategias para mantener los read models actualizados. Cada una tiene sus trade-offs en términos de consistencia, rendimiento y complejidad.
1. Live Projection (En memoria)
La forma más simple: cada vez que necesitas datos, lees todos los eventos y los procesas en memoria. No hay read model persistente.
Ventajas: Siempre consistente, simple de implementar Desventajas: Lento para streams con muchos eventos, no escala
Reconstruye el estado leyendo todos los eventos:
// Cada consulta lee y proyecta eventos
async function getOrderSummary(orderId: string): Promise<OrderSummary> {
const events = await eventStore.readStream(`order-${orderId}`);
return events.reduce((summary, event) => {
switch (event.eventType) {
case 'OrderCreated':
return { ...summary, id: event.data.orderId, status: 'draft', items: [] };
case 'OrderItemAdded':
return { ...summary, items: [...summary.items, event.data] };
case 'OrderConfirmed':
return { ...summary, status: 'confirmed' };
default:
return summary;
}
}, {} as OrderSummary);
}
2. Inline Projection (Sincronica)
Actualiza el read model en la misma transacción que guarda el evento. Garantiza consistencia inmediata.
Ventajas: Read model siempre actualizado, consistencia fuerte Desventajas: Más lento al escribir, acoplamiento entre escritura y lectura
Actualiza el read model en la misma transacción:
async function confirmOrder(orderId: string): Promise<void> {
await db.transaction(async (tx) => {
// 1. Append event
await tx.insert(eventsTable).values({
streamId: `order-${orderId}`,
eventType: 'OrderConfirmed',
data: { orderId, confirmedAt: new Date() }
});
// 2. Update read model (same transaction)
await tx.update(ordersView)
.set({ status: 'confirmed' })
.where(eq(ordersView.id, orderId));
});
}
3. Async Projection (Eventual)
Un proceso separado (worker) lee eventos y actualiza el read model de forma asíncrona. El read model puede estar “atrasado” por milisegundos o segundos.
Ventajas: Escrituras rápidas, escalable, múltiples proyecciones independientes Desventajas: Consistencia eventual (el usuario podría no ver su cambio inmediatamente)
El checkpoint es crucial aquí: es la posición del último evento procesado. Si el worker se reinicia, continúa desde el checkpoint.
Procesa eventos en background:
class OrderProjectionWorker {
private checkpoint: bigint = 0n;
async start(): Promise<void> {
while (true) {
const events = await eventStore.readAll(this.checkpoint);
for (const event of events) {
await this.handle(event);
this.checkpoint = event.globalPosition + 1n;
await this.saveCheckpoint();
}
await sleep(100); // Polling interval
}
}
async handle(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.handleOrderCreated(event);
break;
case 'OrderConfirmed':
await this.handleOrderConfirmed(event);
break;
}
}
}
Implementación de Proyecciones
TypeScript: Proyección de Órdenes
// src/application/projections/orders-projection.ts
import { sql } from 'drizzle-orm';
interface OrderView {
id: string;
customerId: string;
status: string;
itemCount: number;
total: number;
createdAt: Date;
updatedAt: Date;
}
export class OrdersProjection {
constructor(private db: Database) {}
async handle(event: StoredEvent): Promise<void> {
const handlers: Record<string, (e: StoredEvent) => Promise<void>> = {
'OrderCreated': this.onOrderCreated.bind(this),
'OrderItemAdded': this.onOrderItemAdded.bind(this),
'OrderItemRemoved': this.onOrderItemRemoved.bind(this),
'OrderConfirmed': this.onOrderConfirmed.bind(this),
'OrderShipped': this.onOrderShipped.bind(this),
'OrderCancelled': this.onOrderCancelled.bind(this),
};
const handler = handlers[event.eventType];
if (handler) {
await handler(event);
}
}
private async onOrderCreated(event: StoredEvent): Promise<void> {
const data = event.data as OrderCreatedPayload;
await this.db.insert(ordersView).values({
id: data.orderId,
customerId: data.customerId,
status: 'draft',
itemCount: 0,
total: 0,
createdAt: event.createdAt,
updatedAt: event.createdAt
});
}
private async onOrderItemAdded(event: StoredEvent): Promise<void> {
const data = event.data as OrderItemAddedPayload;
const itemTotal = data.quantity * data.unitPrice;
await this.db.update(ordersView)
.set({
itemCount: sql`item_count + 1`,
total: sql`total + ${itemTotal}`,
updatedAt: event.createdAt
})
.where(eq(ordersView.id, data.orderId));
}
private async onOrderConfirmed(event: StoredEvent): Promise<void> {
const data = event.data as OrderConfirmedPayload;
await this.db.update(ordersView)
.set({
status: 'confirmed',
updatedAt: event.createdAt
})
.where(eq(ordersView.id, data.orderId));
}
private async onOrderCancelled(event: StoredEvent): Promise<void> {
const data = event.data as OrderCancelledPayload;
await this.db.update(ordersView)
.set({
status: 'cancelled',
updatedAt: event.createdAt
})
.where(eq(ordersView.id, data.orderId));
}
}
Go: Proyección con Elasticsearch
// application/projections/orders_projection.go
package projections
import (
"context"
"encoding/json"
"github.com/elastic/go-elasticsearch/v8"
)
type OrderView struct {
ID string `json:"id"`
CustomerID string `json:"customerId"`
Status string `json:"status"`
ItemCount int `json:"itemCount"`
Total float64 `json:"total"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
type OrdersProjection struct {
es *elasticsearch.Client
}
func NewOrdersProjection(es *elasticsearch.Client) *OrdersProjection {
return &OrdersProjection{es: es}
}
func (p *OrdersProjection) Handle(ctx context.Context, event StoredEvent) error {
switch event.EventType {
case "OrderCreated":
return p.onOrderCreated(ctx, event)
case "OrderItemAdded":
return p.onOrderItemAdded(ctx, event)
case "OrderConfirmed":
return p.onOrderConfirmed(ctx, event)
default:
return nil
}
}
func (p *OrdersProjection) onOrderCreated(ctx context.Context, event StoredEvent) error {
var data OrderCreatedPayload
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
view := OrderView{
ID: data.OrderID,
CustomerID: data.CustomerID,
Status: "draft",
ItemCount: 0,
Total: 0,
CreatedAt: event.CreatedAt,
UpdatedAt: event.CreatedAt,
}
body, _ := json.Marshal(view)
_, err := p.es.Index(
"orders",
bytes.NewReader(body),
p.es.Index.WithDocumentID(data.OrderID),
p.es.Index.WithContext(ctx),
)
return err
}
func (p *OrdersProjection) onOrderItemAdded(ctx context.Context, event StoredEvent) error {
var data OrderItemAddedPayload
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
script := fmt.Sprintf(`
ctx._source.itemCount += 1;
ctx._source.total += %f;
ctx._source.updatedAt = '%s';
`, data.Quantity*data.UnitPrice, event.CreatedAt.Format(time.RFC3339))
_, err := p.es.Update(
"orders",
data.OrderID,
strings.NewReader(fmt.Sprintf(`{"script": {"source": %q}}`, script)),
p.es.Update.WithContext(ctx),
)
return err
}
Python: Proyección con Redis
# application/projections/orders_projection.py
import json
from redis.asyncio import Redis
class OrdersProjection:
def __init__(self, redis: Redis):
self.redis = redis
async def handle(self, event: StoredEvent) -> None:
handlers = {
"OrderCreated": self._on_order_created,
"OrderItemAdded": self._on_order_item_added,
"OrderConfirmed": self._on_order_confirmed,
}
handler = handlers.get(event.event_type)
if handler:
await handler(event)
async def _on_order_created(self, event: StoredEvent) -> None:
data = event.data
view = {
"id": data["orderId"],
"customerId": data["customerId"],
"status": "draft",
"itemCount": 0,
"total": 0,
"createdAt": event.created_at.isoformat(),
"updatedAt": event.created_at.isoformat(),
}
await self.redis.hset(
f"order:{data['orderId']}",
mapping={k: json.dumps(v) if isinstance(v, (dict, list)) else str(v)
for k, v in view.items()}
)
# Add to customer's orders list
await self.redis.sadd(
f"customer:{data['customerId']}:orders",
data["orderId"]
)
async def _on_order_item_added(self, event: StoredEvent) -> None:
data = event.data
order_key = f"order:{data['orderId']}"
await self.redis.hincrby(order_key, "itemCount", 1)
await self.redis.hincrbyfloat(
order_key,
"total",
data["quantity"] * data["unitPrice"]
)
await self.redis.hset(
order_key,
"updatedAt",
event.created_at.isoformat()
)
async def _on_order_confirmed(self, event: StoredEvent) -> None:
data = event.data
order_key = f"order:{data['orderId']}"
await self.redis.hset(order_key, "status", "confirmed")
await self.redis.hset(
order_key,
"updatedAt",
event.created_at.isoformat()
)
Multiples Proyecciones
Una de las grandes ventajas de Event Sourcing: un mismo evento puede alimentar múltiples read models.
Cada proyección crea una vista optimizada para un caso de uso específico. No necesitas un modelo único que sirva para todo.
Un evento puede alimentar múltiples read models:
// src/application/projection-manager.ts
export class ProjectionManager {
private projections: Projection[] = [];
private checkpoint: bigint = 0n;
register(projection: Projection): void {
this.projections.push(projection);
}
async processEvents(): Promise<void> {
const events = await this.eventStore.readAll(this.checkpoint);
for (const event of events) {
await Promise.all(
this.projections.map(p => p.handle(event))
);
this.checkpoint = event.globalPosition + 1n;
}
}
}
// Uso
const manager = new ProjectionManager(eventStore);
manager.register(new OrdersProjection(db)); // Vista de órdenes
manager.register(new CustomerStatsProjection(db)); // Estadísticas por cliente
manager.register(new SalesReportProjection(db)); // Reportes de ventas
manager.register(new InventoryProjection(db)); // Actualización de inventario
Rebuild de Proyecciones
Una característica poderosa: puedes reconstruir cualquier proyección desde cero en cualquier momento.
¿Por qué querrías hacer esto?
- Corregiste un bug en la lógica de la proyección
- Agregaste nuevos campos al read model
- El read model se corrompió
- Quieres crear una nueva proyección para datos históricos
El proceso es simple: eliminas los datos del read model, reinicias el checkpoint a 0, y reproduces todos los eventos.
Capacidad de reconstruir desde cero:
async function rebuildProjection(projection: Projection): Promise<void> {
console.log('Clearing existing data...');
await projection.clear();
console.log('Replaying all events...');
let checkpoint = 0n;
let count = 0;
while (true) {
const events = await eventStore.readAll(checkpoint, 1000);
if (events.length === 0) break;
for (const event of events) {
await projection.handle(event);
count++;
}
checkpoint = events[events.length - 1].globalPosition + 1n;
console.log(`Processed ${count} events...`);
}
console.log(`Rebuild complete. Total events: ${count}`);
}
Resumen
- Las proyecciones transforman eventos en vistas de lectura
- Pueden ser live (tiempo real), inline (sincrónicas) o async (eventuales)
- Cada proyección mantiene su checkpoint de posición
- Múltiples proyecciones pueden consumir los mismos eventos
- Las proyecciones se pueden reconstruir completamente
Glosario
Proyeccion (Projection)
Definicion: Proceso que escucha eventos y los transforma en estructuras de datos optimizadas para consultas especificas.
Por que es importante: Los eventos son excelentes para capturar cambios pero ineficientes para consultas. Las proyecciones crean vistas rapidas sin modificar la fuente de verdad (los eventos).
Ejemplo practico: La proyeccion OrdersProjection escucha OrderCreated, OrderShipped, etc., y mantiene una tabla orders_view con columnas id, status, total, customer_name para mostrar listas de pedidos rapidamente.
Read Model (Modelo de Lectura)
Definicion: Estructura de datos creada y mantenida por una proyeccion, disenada para responder consultas especificas de forma eficiente.
Por que es importante: Permite tener multiples vistas de los mismos datos, cada una optimizada para un caso de uso diferente.
Ejemplo practico: De los mismos eventos de pedidos, creas: orders_view (lista de pedidos), customer_stats (estadisticas por cliente), daily_sales_report (ventas por dia). Cada uno es un read model distinto.
Checkpoint
Definicion: Posicion del ultimo evento que una proyeccion ha procesado exitosamente. Se persiste para sobrevivir reinicios.
Por que es importante: Si el worker de proyecciones se reinicia, el checkpoint indica donde continuar. Sin checkpoint, tendrias que reprocesar todos los eventos desde el inicio.
Ejemplo practico: La proyeccion orders tiene checkpoint en posicion global 50000. Al reiniciar, consulta eventos desde 50001 en adelante, sin reprocesar los 50000 anteriores.
Consistencia Eventual
Definicion: Modelo de consistencia donde los cambios se propagan gradualmente y las lecturas pueden mostrar datos desactualizados temporalmente.
Por que es importante: Permite sistemas mas escalables y resilientes. El trade-off es que un usuario podria no ver su cambio inmediatamente (tipicamente milisegundos a segundos de delay).
Ejemplo practico: El usuario crea un pedido. El evento se guarda. El usuario ve “Pedido creado”. Pero si inmediatamente va a “Mis pedidos”, podria no verlo aun porque la proyeccion no ha procesado el evento todavia.
Live Projection
Definicion: Proyeccion que se ejecuta en tiempo real cada vez que se necesitan los datos, leyendo y procesando eventos en memoria.
Por que es importante: Siempre consistente, sin lag. Util para agregados individuales con pocos eventos.
Ejemplo practico: Para mostrar el detalle de un pedido especifico, lees los 10 eventos del stream order-123 y los proyectas en memoria. No necesitas tabla persistente para esto.
Inline Projection
Definicion: Proyeccion que actualiza el read model en la misma transaccion de base de datos que guarda el evento.
Por que es importante: Garantiza que el read model siempre esta actualizado. No hay ventana de inconsistencia.
Ejemplo practico: Dentro de db.transaction(), guardas el evento OrderCreated Y actualizas orders_view en la misma operacion atomica.
Async Projection
Definicion: Proyeccion que procesa eventos en un proceso separado (worker), de forma asincrona y eventualmente consistente.
Por que es importante: Desacopla escritura de lectura. Las escrituras son rapidas, las proyecciones procesan a su ritmo, y puedes escalar proyecciones independientemente.
Ejemplo practico: Un worker en background lee nuevos eventos cada 100ms, los procesa, y actualiza orders_view. Si hay pico de trafico, las escrituras no se ralentizan; las proyecciones simplemente se atrasan temporalmente.
Rebuild (Reconstruccion)
Definicion: Proceso de eliminar un read model y reconstruirlo desde cero reproduciendo todos los eventos historicos.
Por que es importante: Permite corregir bugs en proyecciones, agregar nuevos campos, o crear nuevas vistas de datos historicos sin perder informacion.
Ejemplo practico: Descubres que orders_view no calculaba bien los totales. Limpias la tabla, reiniciamos checkpoint a 0, y dejas que el worker reprocese los 100,000 eventos. El read model queda corregido.
Polling
Definicion: Tecnica de consultar periodicamente por nuevos datos en lugar de recibir notificaciones (push).
Por que es importante: Simple de implementar, funciona con cualquier base de datos. El trade-off es latencia (delay entre evento y procesamiento) y uso de recursos.
Ejemplo practico: El projection worker hace SELECT * FROM events WHERE global_position > checkpoint cada 100 milisegundos para detectar nuevos eventos.
CQRS (Command Query Responsibility Segregation)
Definicion: Patron que separa los modelos de escritura (comandos) de los modelos de lectura (queries).
Por que es importante: Permite optimizar cada lado independientemente. Event Sourcing naturalmente lleva a CQRS: escribes eventos, lees de proyecciones.
Ejemplo practico: Para crear pedido, usas el agregado Order (modelo de escritura). Para listar pedidos, consultas orders_view (modelo de lectura). Son estructuras completamente diferentes.