Capítulo 10: Proyecciones en Tiempo Real
Capítulo 10: Proyecciones en Tiempo Real
“Las proyecciones transforman el stream de eventos en vistas útiles”
En este capítulo implementaremos proyecciones asíncronas completas. Verás:
- ProjectionWorker: El proceso que coordina todas las proyecciones
- Checkpoint: Cómo cada proyección recuerda su posición
- Múltiples proyecciones: Diferentes vistas de los mismos eventos
- Rebuild: Cómo reconstruir una proyección desde cero
Arquitectura de Proyecciones
graph LR
ES[Event Store] --> PW[Projection Worker]
PW --> P1[Orders View]
PW --> P2[Customer Stats]
PW --> P3[Sales Report]
PW --> CP[Checkpoint Store]
Projection Worker
El ProjectionWorker es el proceso central que coordina las proyecciones. Sus responsabilidades:
- Mantiene registro de las proyecciones registradas
- Lee eventos desde el Event Store
- Distribuye eventos a cada proyección
- Guarda checkpoints para sobrevivir reinicios
- Proporciona capacidad de rebuild
El patrón de polling (while(running) { process(); sleep(); }) es simple y robusto. Para mayor rendimiento, puedes usar subscripciones push si tu Event Store las soporta.
// src/application/projections/projection-worker.ts
import { EventStore, StoredEvent } from '@infrastructure/event-store/types';
import { Database } from '@infrastructure/database/connection';
import { checkpoints } from '@infrastructure/database/schema';
import { eq } from 'drizzle-orm';
export interface Projection {
name: string;
handle(event: StoredEvent): Promise<void>;
clear(): Promise<void>;
}
export class ProjectionWorker {
private running = false;
private projections: Projection[] = [];
constructor(
private eventStore: EventStore,
private db: Database,
private pollInterval = 100
) {}
register(projection: Projection): void {
this.projections.push(projection);
}
async start(): Promise<void> {
this.running = true;
console.log('Projection worker started');
while (this.running) {
try {
await this.processEvents();
} catch (error) {
console.error('Projection error:', error);
}
await this.sleep(this.pollInterval);
}
}
stop(): void {
this.running = false;
console.log('Projection worker stopped');
}
private async processEvents(): Promise<void> {
for (const projection of this.projections) {
const checkpoint = await this.getCheckpoint(projection.name);
const events = await this.eventStore.readAll(checkpoint, 100);
for (const event of events) {
await projection.handle(event);
await this.saveCheckpoint(
projection.name,
event.globalPosition + 1n
);
}
}
}
private async getCheckpoint(name: string): Promise<bigint> {
const [row] = await this.db
.select()
.from(checkpoints)
.where(eq(checkpoints.projectionName, name));
return row ? BigInt(row.position) : 0n;
}
private async saveCheckpoint(name: string, position: bigint): Promise<void> {
await this.db
.insert(checkpoints)
.values({ projectionName: name, position: position.toString() })
.onConflictDoUpdate({
target: checkpoints.projectionName,
set: { position: position.toString(), updatedAt: new Date() }
});
}
private sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
async rebuild(projectionName: string): Promise<void> {
const projection = this.projections.find(p => p.name === projectionName);
if (!projection) {
throw new Error(`Projection not found: ${projectionName}`);
}
console.log(`Rebuilding projection: ${projectionName}`);
await projection.clear();
await this.db
.delete(checkpoints)
.where(eq(checkpoints.projectionName, projectionName));
let checkpoint = 0n;
let count = 0;
while (true) {
const events = await this.eventStore.readAll(checkpoint, 1000);
if (events.length === 0) break;
for (const event of events) {
await projection.handle(event);
count++;
}
checkpoint = events[events.length - 1].globalPosition + 1n;
await this.saveCheckpoint(projectionName, checkpoint);
console.log(`Processed ${count} events...`);
}
console.log(`Rebuild complete. Total: ${count} events`);
}
}
Orders Projection
Cada proyección implementa la interfaz Projection con:
- name: Identificador único para checkpoints
- handle(event): Procesa un evento y actualiza el read model
- clear(): Elimina todos los datos (usado en rebuild)
La proyección decide qué eventos le interesan y qué hacer con cada uno. Eventos no relevantes se ignoran.
// src/application/projections/orders-projection.ts
import { eq, sql } from 'drizzle-orm';
import { Database } from '@infrastructure/database/connection';
import { ordersView, orderItemsView } from '@infrastructure/database/schema';
import { StoredEvent } from '@infrastructure/event-store/types';
import { Projection } from './projection-worker';
export class OrdersProjection implements Projection {
name = 'orders';
constructor(private db: Database) {}
async handle(event: StoredEvent): Promise<void> {
const handlers: Record<string, (e: StoredEvent) => Promise<void>> = {
OrderCreated: this.onOrderCreated.bind(this),
OrderItemAdded: this.onOrderItemAdded.bind(this),
OrderItemRemoved: this.onOrderItemRemoved.bind(this),
OrderConfirmed: this.onStatusChange.bind(this, 'confirmed'),
PaymentReceived: this.onStatusChange.bind(this, 'paid'),
OrderShipped: this.onOrderShipped.bind(this),
OrderDelivered: this.onStatusChange.bind(this, 'delivered'),
OrderCancelled: this.onStatusChange.bind(this, 'cancelled')
};
const handler = handlers[event.eventType];
if (handler) {
await handler(event);
}
}
async clear(): Promise<void> {
await this.db.delete(orderItemsView);
await this.db.delete(ordersView);
}
private async onOrderCreated(event: StoredEvent): Promise<void> {
const data = event.data as any;
await this.db.insert(ordersView).values({
id: event.streamId.replace('order-', ''),
customerId: data.customerId,
status: 'draft',
itemCount: data.items.length,
total: data.total.amount.toString(),
createdAt: event.createdAt,
updatedAt: event.createdAt
});
for (const item of data.items) {
await this.db.insert(orderItemsView).values({
orderId: event.streamId.replace('order-', ''),
productId: item.productId,
productName: item.productName,
quantity: item.quantity,
unitPrice: item.unitPrice.amount.toString()
});
}
}
private async onOrderItemAdded(event: StoredEvent): Promise<void> {
const data = event.data as any;
const orderId = event.streamId.replace('order-', '');
await this.db.insert(orderItemsView).values({
orderId,
productId: data.item.productId,
productName: data.item.productName,
quantity: data.item.quantity,
unitPrice: data.item.unitPrice.amount.toString()
});
await this.db.update(ordersView)
.set({
itemCount: sql`item_count + 1`,
total: data.newTotal.amount.toString(),
updatedAt: event.createdAt
})
.where(eq(ordersView.id, orderId));
}
private async onOrderItemRemoved(event: StoredEvent): Promise<void> {
const data = event.data as any;
const orderId = event.streamId.replace('order-', '');
await this.db.delete(orderItemsView)
.where(eq(orderItemsView.productId, data.productId));
await this.db.update(ordersView)
.set({
itemCount: sql`item_count - 1`,
total: data.newTotal.amount.toString(),
updatedAt: event.createdAt
})
.where(eq(ordersView.id, orderId));
}
private async onStatusChange(
status: string,
event: StoredEvent
): Promise<void> {
const orderId = event.streamId.replace('order-', '');
await this.db.update(ordersView)
.set({ status, updatedAt: event.createdAt })
.where(eq(ordersView.id, orderId));
}
private async onOrderShipped(event: StoredEvent): Promise<void> {
const data = event.data as any;
const orderId = event.streamId.replace('order-', '');
await this.db.update(ordersView)
.set({
status: 'shipped',
updatedAt: event.createdAt
})
.where(eq(ordersView.id, orderId));
}
}
Customer Stats Projection
Esta proyección demuestra cómo crear un read model completamente diferente de los mismos eventos.
Mientras OrdersProjection crea una vista de pedidos individuales, CustomerStatsProjection crea estadísticas agregadas por cliente: total de pedidos, monto gastado, etc.
Nota el uso de onConflictDoUpdate (upsert): si el cliente no existe lo crea, si existe actualiza los contadores.
// src/application/projections/customer-stats-projection.ts
import { eq, sql } from 'drizzle-orm';
import { pgTable, varchar, integer, decimal, timestamp } from 'drizzle-orm/pg-core';
import { Database } from '@infrastructure/database/connection';
import { StoredEvent } from '@infrastructure/event-store/types';
import { Projection } from './projection-worker';
// Schema para estadísticas de cliente
export const customerStats = pgTable('customer_stats', {
customerId: varchar('customer_id', { length: 255 }).primaryKey(),
totalOrders: integer('total_orders').default(0),
completedOrders: integer('completed_orders').default(0),
cancelledOrders: integer('cancelled_orders').default(0),
totalSpent: decimal('total_spent', { precision: 12, scale: 2 }).default('0'),
lastOrderAt: timestamp('last_order_at', { withTimezone: true }),
updatedAt: timestamp('updated_at', { withTimezone: true })
});
export class CustomerStatsProjection implements Projection {
name = 'customer-stats';
constructor(private db: Database) {}
async handle(event: StoredEvent): Promise<void> {
switch (event.eventType) {
case 'OrderCreated':
await this.onOrderCreated(event);
break;
case 'OrderDelivered':
await this.onOrderDelivered(event);
break;
case 'OrderCancelled':
await this.onOrderCancelled(event);
break;
}
}
async clear(): Promise<void> {
await this.db.delete(customerStats);
}
private async onOrderCreated(event: StoredEvent): Promise<void> {
const data = event.data as any;
await this.db
.insert(customerStats)
.values({
customerId: data.customerId,
totalOrders: 1,
completedOrders: 0,
cancelledOrders: 0,
totalSpent: '0',
lastOrderAt: event.createdAt,
updatedAt: event.createdAt
})
.onConflictDoUpdate({
target: customerStats.customerId,
set: {
totalOrders: sql`${customerStats.totalOrders} + 1`,
lastOrderAt: event.createdAt,
updatedAt: event.createdAt
}
});
}
private async onOrderDelivered(event: StoredEvent): Promise<void> {
// Necesitamos el customerId y total del evento OrderCreated
// En producción, esto se almacenaría en el evento o se consultaría
const orderId = event.streamId;
const orderEvents = await this.getOrderEvents(orderId);
const createdEvent = orderEvents.find(e => e.eventType === 'OrderCreated');
if (!createdEvent) return;
const data = createdEvent.data as any;
await this.db.update(customerStats)
.set({
completedOrders: sql`${customerStats.completedOrders} + 1`,
totalSpent: sql`${customerStats.totalSpent} + ${data.total.amount}`,
updatedAt: event.createdAt
})
.where(eq(customerStats.customerId, data.customerId));
}
private async onOrderCancelled(event: StoredEvent): Promise<void> {
const orderId = event.streamId;
const orderEvents = await this.getOrderEvents(orderId);
const createdEvent = orderEvents.find(e => e.eventType === 'OrderCreated');
if (!createdEvent) return;
const data = createdEvent.data as any;
await this.db.update(customerStats)
.set({
cancelledOrders: sql`${customerStats.cancelledOrders} + 1`,
updatedAt: event.createdAt
})
.where(eq(customerStats.customerId, data.customerId));
}
private async getOrderEvents(streamId: string): Promise<StoredEvent[]> {
// En una implementación real, inyectarías el EventStore
return [];
}
}
Uso del Projection Worker
// src/api/server.ts
import { Hono } from 'hono';
import { serve } from '@hono/node-server';
import { db } from '@infrastructure/database/connection';
import { PostgresEventStore } from '@infrastructure/event-store/postgres-event-store';
import { ProjectionWorker } from '@application/projections/projection-worker';
import { OrdersProjection } from '@application/projections/orders-projection';
import { CustomerStatsProjection } from '@application/projections/customer-stats-projection';
const app = new Hono();
const eventStore = new PostgresEventStore(db);
// Iniciar projection worker
const worker = new ProjectionWorker(eventStore, db);
worker.register(new OrdersProjection(db));
worker.register(new CustomerStatsProjection(db));
// Iniciar en background
worker.start();
// Endpoint para rebuild manual
app.post('/admin/projections/:name/rebuild', async (c) => {
const name = c.req.param('name');
try {
await worker.rebuild(name);
return c.json({ success: true, message: `Projection ${name} rebuilt` });
} catch (error) {
return c.json({ error: (error as Error).message }, 400);
}
});
// Graceful shutdown
process.on('SIGTERM', () => {
worker.stop();
process.exit(0);
});
serve({ fetch: app.fetch, port: 3000 });
Query Service
El Query Service (o Read Service) proporciona métodos para consultar los read models.
Separar las consultas en un servicio dedicado:
- Aísla la lógica de consultas de la lógica de comandos (CQRS)
- Facilita optimizaciones específicas de lectura
- Proporciona una API clara para la capa de presentación
// src/application/queries/order-queries.ts
import { eq, desc, and, gte, lte } from 'drizzle-orm';
import { Database } from '@infrastructure/database/connection';
import { ordersView, orderItemsView } from '@infrastructure/database/schema';
export interface OrderListItem {
id: string;
customerId: string;
status: string;
itemCount: number;
total: number;
createdAt: Date;
}
export interface OrderDetail extends OrderListItem {
items: Array<{
productId: string;
productName: string;
quantity: number;
unitPrice: number;
}>;
}
export class OrderQueryService {
constructor(private db: Database) {}
async getOrderById(orderId: string): Promise<OrderDetail | null> {
const [order] = await this.db
.select()
.from(ordersView)
.where(eq(ordersView.id, orderId));
if (!order) return null;
const items = await this.db
.select()
.from(orderItemsView)
.where(eq(orderItemsView.orderId, orderId));
return {
id: order.id,
customerId: order.customerId,
status: order.status,
itemCount: order.itemCount ?? 0,
total: parseFloat(order.total ?? '0'),
createdAt: order.createdAt ?? new Date(),
items: items.map(i => ({
productId: i.productId,
productName: i.productName,
quantity: i.quantity,
unitPrice: parseFloat(i.unitPrice)
}))
};
}
async getOrdersByCustomer(
customerId: string,
limit = 20
): Promise<OrderListItem[]> {
const orders = await this.db
.select()
.from(ordersView)
.where(eq(ordersView.customerId, customerId))
.orderBy(desc(ordersView.createdAt))
.limit(limit);
return orders.map(o => ({
id: o.id,
customerId: o.customerId,
status: o.status,
itemCount: o.itemCount ?? 0,
total: parseFloat(o.total ?? '0'),
createdAt: o.createdAt ?? new Date()
}));
}
async getRecentOrders(limit = 50): Promise<OrderListItem[]> {
const orders = await this.db
.select()
.from(ordersView)
.orderBy(desc(ordersView.createdAt))
.limit(limit);
return orders.map(o => ({
id: o.id,
customerId: o.customerId,
status: o.status,
itemCount: o.itemCount ?? 0,
total: parseFloat(o.total ?? '0'),
createdAt: o.createdAt ?? new Date()
}));
}
}
Resumen
- El Projection Worker procesa eventos de forma asíncrona
- Cada proyección mantiene su checkpoint independiente
- Las proyecciones se pueden reconstruir desde cero
- Los Query Services consultan los read models
- La consistencia eventual es manejada por el sistema
Glosario
Projection Worker
Definicion: Proceso de larga duracion que coordina la ejecucion de proyecciones, leyendo eventos y distribuyendolos a cada proyeccion registrada.
Por que es importante: Centraliza la logica de polling, checkpoint, y manejo de errores. Las proyecciones solo se preocupan de procesar eventos individuales.
Ejemplo practico: El worker se inicia al arrancar la aplicacion, corre en background, y procesa eventos continuamente hasta que se detiene con worker.stop().
Checkpoint Store
Definicion: Tabla o almacen donde se persiste la posicion del ultimo evento procesado por cada proyeccion.
Por que es importante: Permite que las proyecciones sobrevivan reinicios sin reprocesar todos los eventos. Cada proyeccion tiene su checkpoint independiente.
Ejemplo practico: Tabla checkpoints con projection_name y position. La proyeccion orders tiene position 50000, mientras customer-stats tiene position 49500.
Graceful Shutdown
Definicion: Proceso de detener un servicio de forma ordenada, completando trabajo en progreso antes de terminar.
Por que es importante: Evita corrupcion de datos y perdida de trabajo. El worker termina el batch actual antes de cerrarse.
Ejemplo practico: process.on('SIGTERM', () => worker.stop()) intercepta la senal de terminacion y detiene el worker limpiamente en lugar de matarlo abruptamente.
Query Service
Definicion: Servicio que encapsula la logica de consultas a los read models, proporcionando una API limpia para la capa de presentacion.
Por que es importante: Separa claramente las operaciones de lectura de las de escritura (CQRS). Permite optimizar y cachear consultas independientemente.
Ejemplo practico: OrderQueryService.getOrdersByCustomer(customerId) oculta los detalles de la consulta SQL y retorna DTOs limpios para el frontend.
Polling vs Push
Definicion: Polling consulta periodicamente por nuevos datos; Push recibe notificaciones cuando hay datos nuevos.
Por que es importante: Polling es simple y funciona con cualquier base de datos. Push tiene menor latencia pero requiere infraestructura mas compleja.
Ejemplo practico: El worker hace SELECT * FROM events WHERE position > checkpoint cada 100ms (polling). Con EventStoreDB, usarias client.subscribeToAll() (push).
Upsert (ON CONFLICT DO UPDATE)
Definicion: Operacion que inserta un registro si no existe, o lo actualiza si ya existe.
Por que es importante: Simplifica logica de “crear o actualizar”. En proyecciones, el primer evento de un cliente lo crea; los siguientes lo actualizan.
Ejemplo practico: INSERT INTO customer_stats (...) ON CONFLICT (customer_id) DO UPDATE SET total_orders = total_orders + 1 maneja ambos casos en una query.
Batch Processing
Definicion: Procesar multiples elementos juntos en lugar de uno por uno.
Por que es importante: Reduce overhead de red y transacciones. Procesar 100 eventos en un batch es mas eficiente que 100 operaciones individuales.
Ejemplo practico: eventStore.readAll(checkpoint, 100) lee hasta 100 eventos a la vez. El worker los procesa secuencialmente pero la lectura es un solo round-trip.
Idempotencia en Proyecciones
Definicion: Capacidad de procesar el mismo evento multiples veces produciendo el mismo resultado.
Por que es importante: Si el worker falla despues de procesar un evento pero antes de guardar checkpoint, el evento se reprocesara. Debe ser seguro.
Ejemplo practico: INSERT ... ON CONFLICT DO NOTHING ignora duplicados. O verificar WHERE NOT EXISTS (SELECT 1 FROM processed_events WHERE event_id = ?).
Segregacion de Responsabilidad (CQRS)
Definicion: Separar completamente los modelos y servicios de lectura (Query) de los de escritura (Command).
Por que es importante: Permite optimizar cada lado independientemente. Las escrituras priorizan consistencia; las lecturas priorizan velocidad.
Ejemplo practico: Para crear pedido usas OrderCommandHandler -> Order -> EventStore. Para listar pedidos usas OrderQueryService -> orders_view. Flujos completamente separados.
Background Worker
Definicion: Proceso que corre continuamente en segundo plano, realizando trabajo sin interaccion directa del usuario.
Por que es importante: Las proyecciones deben procesarse independientemente de las requests HTTP. El worker corre aunque no haya usuarios activos.
Ejemplo practico: El ProjectionWorker se inicia al arrancar el servidor y corre indefinidamente: worker.start() no bloquea, corre en background.
← Capítulo 9: Agregado Order | Capítulo 11: Event Sourcing en Go →