← Volver al listado de tecnologías
Capítulo 8: Domain Events
Capítulo 8: Domain Events
¿Qué es un Domain Event?
Un Domain Event representa algo que ocurrió en el dominio y que es relevante para el negocio. Es un hecho inmutable del pasado.
flowchart LR
subgraph Origen["Agregado"]
A["Pedido.confirmar()"]
end
subgraph Evento["Domain Event"]
B["PedidoConfirmado\n- pedido_id\n- total\n- fecha"]
end
subgraph Handlers["Event Handlers"]
C["Notificar Cliente"]
D["Reservar Stock"]
E["Generar Factura"]
end
A -->|"emite"| B
B --> C
B --> D
B --> E
“Un domain event es algo que ocurrió en el dominio que quieres que otras partes del sistema conozcan.” — Eric Evans
Características de Domain Events
| Característica | Descripción |
|---|---|
| Inmutable | Una vez creado, no cambia |
| Pasado | Nombrado en tiempo pasado |
| Relevante | Significativo para el negocio |
| Autónomo | Contiene todos los datos necesarios |
| Con timestamp | Registra cuándo ocurrió |
Estructura Base
from dataclasses import dataclass, field
from datetime import datetime
from uuid import UUID, uuid4
from typing import Any
@dataclass(frozen=True)
class DomainEvent:
"""Clase base para todos los eventos de dominio"""
event_id: UUID = field(default_factory=uuid4)
ocurrido_en: datetime = field(default_factory=datetime.now)
version: int = 1
@property
def nombre_evento(self) -> str:
return self.__class__.__name__
def to_dict(self) -> dict[str, Any]:
"""Serialización para persistencia o mensajería"""
return {
"event_id": str(self.event_id),
"nombre": self.nombre_evento,
"ocurrido_en": self.ocurrido_en.isoformat(),
"version": self.version,
"datos": self._datos_especificos(),
}
def _datos_especificos(self) -> dict[str, Any]:
"""Sobrescribir en subclases"""
return {}
Eventos Específicos del Dominio
from decimal import Decimal
@dataclass(frozen=True)
class PedidoConfirmado(DomainEvent):
"""Emitido cuando un pedido es confirmado"""
pedido_id: UUID = None
cliente_id: UUID = None
total: Decimal = Decimal("0")
cantidad_productos: int = 0
direccion_envio: str = ""
def _datos_especificos(self) -> dict:
return {
"pedido_id": str(self.pedido_id),
"cliente_id": str(self.cliente_id),
"total": str(self.total),
"cantidad_productos": self.cantidad_productos,
}
@dataclass(frozen=True)
class PedidoCancelado(DomainEvent):
pedido_id: UUID = None
motivo: str = ""
solicitado_por: str = "" # "cliente" o "sistema"
@dataclass(frozen=True)
class PagoRecibido(DomainEvent):
pedido_id: UUID = None
monto: Decimal = Decimal("0")
metodo_pago: str = ""
referencia_transaccion: str = ""
procesador: str = ""
@dataclass(frozen=True)
class ProductoAgotado(DomainEvent):
producto_id: UUID = None
sku: str = ""
almacen_id: UUID = None
ultima_cantidad: int = 0
@dataclass(frozen=True)
class ClienteRegistrado(DomainEvent):
cliente_id: UUID = None
email: str = ""
nombre: str = ""
tipo_cliente: str = "regular"
@dataclass(frozen=True)
class ReservaCreada(DomainEvent):
reserva_id: UUID = None
habitacion_id: UUID = None
huesped_id: UUID = None
fecha_entrada: str = ""
fecha_salida: str = ""
precio_total: Decimal = Decimal("0")
Agregado con Eventos
from dataclasses import dataclass, field
@dataclass
class Pedido:
id: UUID = field(default_factory=uuid4)
cliente_id: UUID = None
_lineas: list["LineaPedido"] = field(default_factory=list)
_estado: "EstadoPedido" = None
_eventos: list[DomainEvent] = field(default_factory=list)
def confirmar(self) -> None:
if self._estado != EstadoPedido.PAGADO:
raise PedidoNoConfirmable("Pedido no está pagado")
self._estado = EstadoPedido.CONFIRMADO
# Registrar evento de dominio
self._eventos.append(
PedidoConfirmado(
pedido_id=self.id,
cliente_id=self.cliente_id,
total=self.total,
cantidad_productos=len(self._lineas),
)
)
def cancelar(self, motivo: str, solicitado_por: str = "cliente") -> None:
if self._estado in (EstadoPedido.ENVIADO, EstadoPedido.ENTREGADO):
raise PedidoNoCancelable()
self._estado = EstadoPedido.CANCELADO
self._eventos.append(
PedidoCancelado(
pedido_id=self.id,
motivo=motivo,
solicitado_por=solicitado_por,
)
)
def registrar_pago(
self, monto: Decimal, metodo: str, referencia: str
) -> None:
if self._estado != EstadoPedido.PENDIENTE_PAGO:
raise EstadoInvalido()
self._estado = EstadoPedido.PAGADO
self._eventos.append(
PagoRecibido(
pedido_id=self.id,
monto=monto,
metodo_pago=metodo,
referencia_transaccion=referencia,
)
)
# === Gestión de eventos ===
def eventos_pendientes(self) -> list[DomainEvent]:
"""Retorna copia de eventos pendientes"""
return list(self._eventos)
def limpiar_eventos(self) -> None:
"""Limpia eventos después de publicarlos"""
self._eventos.clear()
def obtener_y_limpiar_eventos(self) -> list[DomainEvent]:
"""Obtiene eventos y los limpia en una operación"""
eventos = list(self._eventos)
self._eventos.clear()
return eventos
Despachador de Eventos
from typing import Callable, Type
from collections import defaultdict
class DespachadorEventos:
"""Publica eventos a sus handlers registrados"""
def __init__(self):
self._handlers: dict[Type[DomainEvent], list[Callable]] = defaultdict(list)
def registrar(
self,
tipo_evento: Type[DomainEvent],
handler: Callable[[DomainEvent], None]
) -> None:
"""Registra un handler para un tipo de evento"""
self._handlers[tipo_evento].append(handler)
def desregistrar(
self,
tipo_evento: Type[DomainEvent],
handler: Callable
) -> None:
"""Elimina un handler"""
if handler in self._handlers[tipo_evento]:
self._handlers[tipo_evento].remove(handler)
def despachar(self, evento: DomainEvent) -> None:
"""Despacha un evento a todos sus handlers"""
tipo = type(evento)
for handler in self._handlers[tipo]:
try:
handler(evento)
except Exception as e:
# Log error pero continuar con otros handlers
print(f"Error en handler: {e}")
def despachar_todos(self, eventos: list[DomainEvent]) -> None:
"""Despacha múltiples eventos en orden"""
for evento in eventos:
self.despachar(evento)
class DespachadorEventosAsync:
"""Versión asíncrona del despachador"""
def __init__(self, cola_mensajes: "ColaMensajes"):
self._cola = cola_mensajes
def despachar(self, evento: DomainEvent) -> None:
"""Encola el evento para procesamiento asíncrono"""
self._cola.publicar(
canal=evento.nombre_evento,
mensaje=evento.to_dict(),
)
Handlers de Eventos
class NotificadorPedidos:
"""Handler que envía notificaciones al confirmar pedidos"""
def __init__(self, servicio_email: "ServicioEmail"):
self._email = servicio_email
def al_confirmar_pedido(self, evento: PedidoConfirmado) -> None:
self._email.enviar(
destinatario=self._obtener_email_cliente(evento.cliente_id),
asunto=f"Pedido {evento.pedido_id} confirmado",
plantilla="pedido_confirmado",
datos={
"pedido_id": str(evento.pedido_id),
"total": str(evento.total),
"productos": evento.cantidad_productos,
},
)
def al_cancelar_pedido(self, evento: PedidoCancelado) -> None:
self._email.enviar(
destinatario=self._obtener_email_cliente_por_pedido(evento.pedido_id),
asunto=f"Pedido {evento.pedido_id} cancelado",
plantilla="pedido_cancelado",
datos={"motivo": evento.motivo},
)
class ActualizadorInventario:
"""Handler que actualiza inventario al confirmar pedidos"""
def __init__(self, repo_inventario: "RepositorioInventario"):
self._repo = repo_inventario
def al_confirmar_pedido(self, evento: PedidoConfirmado) -> None:
# Reservar stock para los productos del pedido
pedido = self._obtener_pedido(evento.pedido_id)
for linea in pedido.obtener_lineas():
self._repo.reservar_stock(
producto_id=linea.producto_id,
cantidad=linea.cantidad,
referencia=f"PEDIDO-{evento.pedido_id}",
)
def al_cancelar_pedido(self, evento: PedidoCancelado) -> None:
# Liberar stock reservado
self._repo.liberar_reserva(
referencia=f"PEDIDO-{evento.pedido_id}"
)
class GeneradorFacturas:
"""Handler que genera facturas al recibir pagos"""
def __init__(self, servicio_facturacion: "ServicioFacturacion"):
self._facturacion = servicio_facturacion
def al_recibir_pago(self, evento: PagoRecibido) -> None:
self._facturacion.generar_factura(
pedido_id=evento.pedido_id,
monto=evento.monto,
referencia_pago=evento.referencia_transaccion,
)
Integración en el Caso de Uso
class ConfirmarPedidoHandler:
def __init__(
self,
uow: "UnitOfWork",
despachador: DespachadorEventos
):
self._uow = uow
self._despachador = despachador
def ejecutar(self, comando: "ConfirmarPedidoCommand") -> None:
with self._uow:
pedido = self._uow.pedidos.obtener(comando.pedido_id)
if not pedido:
raise PedidoNoEncontrado(comando.pedido_id)
# Ejecutar lógica de negocio (genera eventos)
pedido.confirmar()
# Persistir cambios
self._uow.pedidos.guardar(pedido)
self._uow.commit()
# Despachar eventos DESPUÉS del commit
eventos = pedido.obtener_y_limpiar_eventos()
self._despachador.despachar_todos(eventos)
Configuración del Sistema
def configurar_handlers(despachador: DespachadorEventos) -> None:
"""Configura todos los handlers de eventos"""
# Servicios de infraestructura
servicio_email = ServicioEmailSMTP(config.smtp)
repo_inventario = RepositorioInventarioSQL(session)
servicio_facturacion = ServicioFacturacionAPI(config.api_facturacion)
# Handlers
notificador = NotificadorPedidos(servicio_email)
inventario = ActualizadorInventario(repo_inventario)
facturador = GeneradorFacturas(servicio_facturacion)
# Registrar handlers
despachador.registrar(PedidoConfirmado, notificador.al_confirmar_pedido)
despachador.registrar(PedidoConfirmado, inventario.al_confirmar_pedido)
despachador.registrar(PedidoCancelado, notificador.al_cancelar_pedido)
despachador.registrar(PedidoCancelado, inventario.al_cancelar_pedido)
despachador.registrar(PagoRecibido, facturador.al_recibir_pago)
# En el bootstrap de la aplicación
despachador = DespachadorEventos()
configurar_handlers(despachador)
Event Store (Persistencia de Eventos)
from dataclasses import dataclass
from datetime import datetime
@dataclass
class EventoAlmacenado:
id: int
agregado_id: UUID
tipo_agregado: str
tipo_evento: str
datos: dict
version: int
timestamp: datetime
class EventStore:
"""Almacena eventos para auditoría y event sourcing"""
def __init__(self, session: "Session"):
self._session = session
def guardar(self, agregado_id: UUID, eventos: list[DomainEvent]) -> None:
for evento in eventos:
registro = EventoAlmacenado(
agregado_id=agregado_id,
tipo_agregado=self._tipo_agregado(agregado_id),
tipo_evento=evento.nombre_evento,
datos=evento.to_dict(),
version=evento.version,
timestamp=evento.ocurrido_en,
)
self._session.add(registro)
self._session.flush()
def obtener_eventos(
self, agregado_id: UUID, desde_version: int = 0
) -> list[EventoAlmacenado]:
return (
self._session.query(EventoAlmacenado)
.filter(EventoAlmacenado.agregado_id == agregado_id)
.filter(EventoAlmacenado.version > desde_version)
.order_by(EventoAlmacenado.version)
.all()
)
Beneficios de Domain Events
┌─────────────────────────────────────────────────────────────┐
│ BENEFICIOS │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. DESACOPLAMIENTO │
│ Agregado ──▶ Evento ──▶ Handler │
│ El agregado no conoce quién escucha │
│ │
│ 2. EXTENSIBILIDAD │
│ Agregar nuevos handlers sin modificar el dominio │
│ Nuevo requisito = Nuevo handler │
│ │
│ 3. AUDITORÍA │
│ Historial completo de hechos del sistema │
│ Qué pasó, cuándo, con qué datos │
│ │
│ 4. INTEGRACIÓN │
│ Comunicar entre bounded contexts │
│ Publicar a sistemas externos │
│ │
│ 5. EVENT SOURCING │
│ Reconstruir estado desde eventos │
│ Proyecciones para diferentes vistas │
│ │
└─────────────────────────────────────────────────────────────┘
Checklist de Domain Events
- ¿El evento está nombrado en tiempo pasado?
- ¿El evento es inmutable (frozen=True)?
- ¿Contiene todos los datos necesarios para los handlers?
- ¿Se despacha después del commit de la transacción?
- ¿Los handlers son idempotentes?
- ¿Se registran los eventos para auditoría?
- ¿El evento refleja un concepto del lenguaje ubicuo?