Publicación de Eventos de Dominio
Publicacion de Eventos de Dominio
En este capitulo implementaremos el sistema de eventos que conecta el Write Model con el Read Model. Aprenderemos a definir eventos, publicarlos y suscribir proyecciones.
El Rol de los Eventos en CQRS
Los eventos de dominio son el puente entre el Write Model y el Read Model. Cuando algo importante ocurre en el dominio (se crea un pedido, se agrega un item, se confirma una compra), el agregado genera un evento que describe lo que paso.
Las proyecciones escuchan estos eventos y actualizan el Read Model en consecuencia. Este mecanismo permite que ambos modelos evolucionen independientemente pero permanezcan sincronizados.
Domain Events: Definiendo lo que Ocurrio
Los eventos se nombran en tiempo pasado porque describen algo que ya sucedio. No son intenciones (como los comandos) sino hechos consumados:
// src/domain/order/order.events.ts
import { BaseDomainEvent } from "../shared/domain-event";
export class OrderCreatedEvent extends BaseDomainEvent {
readonly eventType = "order.created";
constructor(
aggregateId: string,
readonly customerId: string
) {
super(aggregateId);
}
}
export class ItemAddedEvent extends BaseDomainEvent {
readonly eventType = "order.item_added";
constructor(
aggregateId: string,
readonly productId: string,
readonly quantity: number
) {
super(aggregateId);
}
}
export class OrderConfirmedEvent extends BaseDomainEvent {
readonly eventType = "order.confirmed";
constructor(
aggregateId: string,
readonly total: number
) {
super(aggregateId);
}
}
Cada evento contiene la informacion necesaria para que las proyecciones puedan actualizar el Read Model sin necesidad de consultar el Write Model.
Event Bus Interface: El Contrato
El Event Bus es similar al Command Bus pero con una diferencia clave: un evento puede tener multiples suscriptores, mientras que un comando tiene un unico handler:
// src/infrastructure/messaging/event-bus.ts
import { DomainEvent } from "@domain/shared/domain-event";
export interface EventBus {
publish(event: DomainEvent): Promise<void>;
subscribe<T extends DomainEvent>(
eventType: string,
handler: (event: T) => Promise<void>
): void;
}
In-Memory Event Bus - TypeScript
Para desarrollo y testing, una implementacion en memoria es suficiente:
// src/infrastructure/messaging/in-memory-event-bus.ts
import { DomainEvent } from "@domain/shared/domain-event";
import { EventBus } from "./event-bus";
type EventHandler = (event: DomainEvent) => Promise<void>;
export class InMemoryEventBus implements EventBus {
private handlers = new Map<string, EventHandler[]>();
subscribe<T extends DomainEvent>(
eventType: string,
handler: (event: T) => Promise<void>
): void {
const existing = this.handlers.get(eventType) || [];
this.handlers.set(eventType, [...existing, handler as EventHandler]);
}
async publish(event: DomainEvent): Promise<void> {
const handlers = this.handlers.get(event.eventType) || [];
await Promise.all(handlers.map(h => h(event)));
}
}
Event Bus - Go
// internal/infrastructure/messaging/event_bus.go
package messaging
import (
"context"
"sync"
"github.com/company/orderflow/internal/domain/shared"
)
type EventHandler func(ctx context.Context, event shared.DomainEvent) error
type EventBus interface {
Publish(ctx context.Context, event shared.DomainEvent) error
Subscribe(eventType string, handler EventHandler)
}
type InMemoryEventBus struct {
mu sync.RWMutex
handlers map[string][]EventHandler
}
func NewInMemoryEventBus() *InMemoryEventBus {
return &InMemoryEventBus{
handlers: make(map[string][]EventHandler),
}
}
func (b *InMemoryEventBus) Subscribe(eventType string, handler EventHandler) {
b.mu.Lock()
defer b.mu.Unlock()
b.handlers[eventType] = append(b.handlers[eventType], handler)
}
func (b *InMemoryEventBus) Publish(ctx context.Context, event shared.DomainEvent) error {
b.mu.RLock()
handlers := b.handlers[event.EventType()]
b.mu.RUnlock()
for _, h := range handlers {
if err := h(ctx, event); err != nil {
return err
}
}
return nil
}
Event Bus - Python
# src/orderflow/infrastructure/messaging/event_bus.py
from abc import ABC, abstractmethod
from collections import defaultdict
from typing import Callable, Awaitable
from ...domain.shared.domain_event import DomainEvent
EventHandler = Callable[[DomainEvent], Awaitable[None]]
class EventBus(ABC):
@abstractmethod
async def publish(self, event: DomainEvent) -> None: ...
@abstractmethod
def subscribe(self, event_type: str, handler: EventHandler) -> None: ...
class InMemoryEventBus(EventBus):
def __init__(self) -> None:
self._handlers: dict[str, list[EventHandler]] = defaultdict(list)
def subscribe(self, event_type: str, handler: EventHandler) -> None:
self._handlers[event_type].append(handler)
async def publish(self, event: DomainEvent) -> None:
for handler in self._handlers[event.event_type]:
await handler(event)
Order Projection: Actualizando el Read Model
La proyeccion es el componente que escucha eventos y actualiza el Read Model. Cada tipo de evento tiene su propio handler dentro de la proyeccion:
// src/infrastructure/projections/order.projection.ts
import { EventBus } from "../messaging/event-bus";
import { OrderCreatedEvent, OrderConfirmedEvent } from "@domain/order/order.events";
import { OrderReadRepository } from "./order-read.repository";
export class OrderProjection {
constructor(
private readonly eventBus: EventBus,
private readonly readRepository: OrderReadRepository
) {
this.registerHandlers();
}
private registerHandlers(): void {
this.eventBus.subscribe<OrderCreatedEvent>(
"order.created",
this.onOrderCreated.bind(this)
);
this.eventBus.subscribe<OrderConfirmedEvent>(
"order.confirmed",
this.onOrderConfirmed.bind(this)
);
}
private async onOrderCreated(event: OrderCreatedEvent): Promise<void> {
await this.readRepository.create({
id: event.aggregateId,
customerId: event.customerId,
status: "pending",
items: [],
total: 0,
createdAt: event.occurredAt.toISOString()
});
}
private async onOrderConfirmed(event: OrderConfirmedEvent): Promise<void> {
await this.readRepository.update(event.aggregateId, {
status: "confirmed",
total: event.total
});
}
}
Transactional Outbox: Para Produccion
En produccion, hay un problema: que pasa si guardas el agregado pero falla la publicacion del evento? El Write Model tiene el cambio pero el Read Model nunca se enterara.
El patron Outbox resuelve esto guardando los eventos en la misma transaccion que el agregado. Un proceso separado lee la tabla outbox y publica los eventos:
// src/infrastructure/persistence/postgres/outbox-order.repository.ts
async save(order: OrderAggregate): Promise<void> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
// Guarda orden
await client.query(
`INSERT INTO orders (id, customer_id, status, total) VALUES ($1, $2, $3, $4)`,
[order.id, order.customerId, order.status, order.total]
);
// Guarda eventos en outbox (misma transacción)
for (const event of order.pullEvents()) {
await client.query(
`INSERT INTO outbox (event_id, event_type, payload, created_at)
VALUES ($1, $2, $3, $4)`,
[event.eventId, event.eventType, JSON.stringify(event), new Date()]
);
}
await client.query("COMMIT");
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
Con outbox, si la transaccion falla, ni el agregado ni los eventos se guardan. Si tiene exito, ambos se guardan atomicamente. Un proceso de polling o CDC (Change Data Capture) lee los eventos de outbox y los publica.
Proximos Pasos
En el siguiente capitulo implementaremos el Query Bus y sus handlers.
Glosario
Evento de Dominio (Domain Event)
Definicion: Objeto inmutable que representa algo significativo que ocurrio en el dominio. Esta escrito en tiempo pasado y contiene los datos relevantes del suceso.
Por que es importante: Es el mecanismo de comunicacion entre el Write Model y el Read Model. Permite desacoplamiento y sincronizacion asincrona.
Ejemplo practico: OrderCreatedEvent se emite cuando se crea un pedido. Contiene orderId y customerId, permitiendo que la proyeccion cree el registro en el Read Model.
Publish/Subscribe (Publicar/Suscribir)
Definicion: Patron de mensajeria donde los publicadores emiten mensajes a un canal y los suscriptores reciben copias de esos mensajes. Los publicadores no conocen a los suscriptores.
Por que es importante: Permite agregar nuevos consumidores de eventos sin modificar el codigo que los produce. Facilita la extension del sistema.
Ejemplo practico: OrderCreatedEvent puede tener multiples suscriptores: la proyeccion de pedidos, el servicio de notificaciones, el sistema de analytics, etc.
Proyeccion (Projection)
Definicion: Componente que escucha eventos de dominio y actualiza una representacion derivada de los datos (tipicamente el Read Model).
Por que es importante: Es el mecanismo que mantiene sincronizado el Read Model con el Write Model. Transforma eventos en actualizaciones del modelo de lectura.
Ejemplo practico: OrderProjection escucha OrderCreatedEvent y crea un registro en Elasticsearch con el nombre del cliente ya incluido (desnormalizado).
Outbox Pattern (Patron Outbox)
Definicion: Patron que garantiza la publicacion de eventos guardandolos en una tabla de la misma base de datos transaccional, en la misma transaccion que los datos de negocio.
Por que es importante: Resuelve el problema de consistencia entre guardar datos y publicar eventos. Sin outbox, podrias guardar el agregado pero perder el evento si falla la publicacion.
Ejemplo practico: Al guardar un pedido, tambien guardas OrderCreatedEvent en la tabla outbox. Un proceso separado lee esta tabla y publica los eventos al Event Bus.
Event Handler (Manejador de Evento)
Definicion: Funcion que se ejecuta cuando se publica un evento especifico. Recibe el evento como parametro y realiza alguna accion.
Por que es importante: Define la reaccion del sistema ante cada evento. Puede haber multiples handlers para el mismo tipo de evento.
Ejemplo practico: El handler para OrderConfirmedEvent en la proyeccion actualiza el status del pedido a “confirmed” en el Read Model.
Idempotencia
Definicion: Propiedad de una operacion que produce el mismo resultado aunque se ejecute multiples veces.
Por que es importante: En sistemas distribuidos, los eventos pueden llegar duplicados. Los handlers deben ser idempotentes para no corromper datos.
Ejemplo practico: Si OrderCreatedEvent llega dos veces, el handler debe crear el registro solo una vez (usando upsert o verificando existencia).