← Volver al listado de tecnologías

Capítulo 8: Domain Events

Por: Artiko
ddddomain-eventspythontactical-design

Capítulo 8: Domain Events

¿Qué es un Domain Event?

Un Domain Event representa algo que ocurrió en el dominio y que es relevante para el negocio. Es un hecho inmutable del pasado.

flowchart LR
    subgraph Origen["Agregado"]
        A["Pedido.confirmar()"]
    end

    subgraph Evento["Domain Event"]
        B["PedidoConfirmado\n- pedido_id\n- total\n- fecha"]
    end

    subgraph Handlers["Event Handlers"]
        C["Notificar Cliente"]
        D["Reservar Stock"]
        E["Generar Factura"]
    end

    A -->|"emite"| B
    B --> C
    B --> D
    B --> E

“Un domain event es algo que ocurrió en el dominio que quieres que otras partes del sistema conozcan.” — Eric Evans

Características de Domain Events

CaracterísticaDescripción
InmutableUna vez creado, no cambia
PasadoNombrado en tiempo pasado
RelevanteSignificativo para el negocio
AutónomoContiene todos los datos necesarios
Con timestampRegistra cuándo ocurrió

Estructura Base

from dataclasses import dataclass, field
from datetime import datetime
from uuid import UUID, uuid4
from typing import Any

@dataclass(frozen=True)
class DomainEvent:
    """Clase base para todos los eventos de dominio"""
    event_id: UUID = field(default_factory=uuid4)
    ocurrido_en: datetime = field(default_factory=datetime.now)
    version: int = 1

    @property
    def nombre_evento(self) -> str:
        return self.__class__.__name__

    def to_dict(self) -> dict[str, Any]:
        """Serialización para persistencia o mensajería"""
        return {
            "event_id": str(self.event_id),
            "nombre": self.nombre_evento,
            "ocurrido_en": self.ocurrido_en.isoformat(),
            "version": self.version,
            "datos": self._datos_especificos(),
        }

    def _datos_especificos(self) -> dict[str, Any]:
        """Sobrescribir en subclases"""
        return {}

Eventos Específicos del Dominio

from decimal import Decimal

@dataclass(frozen=True)
class PedidoConfirmado(DomainEvent):
    """Emitido cuando un pedido es confirmado"""
    pedido_id: UUID = None
    cliente_id: UUID = None
    total: Decimal = Decimal("0")
    cantidad_productos: int = 0
    direccion_envio: str = ""

    def _datos_especificos(self) -> dict:
        return {
            "pedido_id": str(self.pedido_id),
            "cliente_id": str(self.cliente_id),
            "total": str(self.total),
            "cantidad_productos": self.cantidad_productos,
        }

@dataclass(frozen=True)
class PedidoCancelado(DomainEvent):
    pedido_id: UUID = None
    motivo: str = ""
    solicitado_por: str = ""  # "cliente" o "sistema"

@dataclass(frozen=True)
class PagoRecibido(DomainEvent):
    pedido_id: UUID = None
    monto: Decimal = Decimal("0")
    metodo_pago: str = ""
    referencia_transaccion: str = ""
    procesador: str = ""

@dataclass(frozen=True)
class ProductoAgotado(DomainEvent):
    producto_id: UUID = None
    sku: str = ""
    almacen_id: UUID = None
    ultima_cantidad: int = 0

@dataclass(frozen=True)
class ClienteRegistrado(DomainEvent):
    cliente_id: UUID = None
    email: str = ""
    nombre: str = ""
    tipo_cliente: str = "regular"

@dataclass(frozen=True)
class ReservaCreada(DomainEvent):
    reserva_id: UUID = None
    habitacion_id: UUID = None
    huesped_id: UUID = None
    fecha_entrada: str = ""
    fecha_salida: str = ""
    precio_total: Decimal = Decimal("0")

Agregado con Eventos

from dataclasses import dataclass, field

@dataclass
class Pedido:
    id: UUID = field(default_factory=uuid4)
    cliente_id: UUID = None
    _lineas: list["LineaPedido"] = field(default_factory=list)
    _estado: "EstadoPedido" = None
    _eventos: list[DomainEvent] = field(default_factory=list)

    def confirmar(self) -> None:
        if self._estado != EstadoPedido.PAGADO:
            raise PedidoNoConfirmable("Pedido no está pagado")

        self._estado = EstadoPedido.CONFIRMADO

        # Registrar evento de dominio
        self._eventos.append(
            PedidoConfirmado(
                pedido_id=self.id,
                cliente_id=self.cliente_id,
                total=self.total,
                cantidad_productos=len(self._lineas),
            )
        )

    def cancelar(self, motivo: str, solicitado_por: str = "cliente") -> None:
        if self._estado in (EstadoPedido.ENVIADO, EstadoPedido.ENTREGADO):
            raise PedidoNoCancelable()

        self._estado = EstadoPedido.CANCELADO

        self._eventos.append(
            PedidoCancelado(
                pedido_id=self.id,
                motivo=motivo,
                solicitado_por=solicitado_por,
            )
        )

    def registrar_pago(
        self, monto: Decimal, metodo: str, referencia: str
    ) -> None:
        if self._estado != EstadoPedido.PENDIENTE_PAGO:
            raise EstadoInvalido()

        self._estado = EstadoPedido.PAGADO

        self._eventos.append(
            PagoRecibido(
                pedido_id=self.id,
                monto=monto,
                metodo_pago=metodo,
                referencia_transaccion=referencia,
            )
        )

    # === Gestión de eventos ===

    def eventos_pendientes(self) -> list[DomainEvent]:
        """Retorna copia de eventos pendientes"""
        return list(self._eventos)

    def limpiar_eventos(self) -> None:
        """Limpia eventos después de publicarlos"""
        self._eventos.clear()

    def obtener_y_limpiar_eventos(self) -> list[DomainEvent]:
        """Obtiene eventos y los limpia en una operación"""
        eventos = list(self._eventos)
        self._eventos.clear()
        return eventos

Despachador de Eventos

from typing import Callable, Type
from collections import defaultdict

class DespachadorEventos:
    """Publica eventos a sus handlers registrados"""

    def __init__(self):
        self._handlers: dict[Type[DomainEvent], list[Callable]] = defaultdict(list)

    def registrar(
        self,
        tipo_evento: Type[DomainEvent],
        handler: Callable[[DomainEvent], None]
    ) -> None:
        """Registra un handler para un tipo de evento"""
        self._handlers[tipo_evento].append(handler)

    def desregistrar(
        self,
        tipo_evento: Type[DomainEvent],
        handler: Callable
    ) -> None:
        """Elimina un handler"""
        if handler in self._handlers[tipo_evento]:
            self._handlers[tipo_evento].remove(handler)

    def despachar(self, evento: DomainEvent) -> None:
        """Despacha un evento a todos sus handlers"""
        tipo = type(evento)
        for handler in self._handlers[tipo]:
            try:
                handler(evento)
            except Exception as e:
                # Log error pero continuar con otros handlers
                print(f"Error en handler: {e}")

    def despachar_todos(self, eventos: list[DomainEvent]) -> None:
        """Despacha múltiples eventos en orden"""
        for evento in eventos:
            self.despachar(evento)

class DespachadorEventosAsync:
    """Versión asíncrona del despachador"""

    def __init__(self, cola_mensajes: "ColaMensajes"):
        self._cola = cola_mensajes

    def despachar(self, evento: DomainEvent) -> None:
        """Encola el evento para procesamiento asíncrono"""
        self._cola.publicar(
            canal=evento.nombre_evento,
            mensaje=evento.to_dict(),
        )

Handlers de Eventos

class NotificadorPedidos:
    """Handler que envía notificaciones al confirmar pedidos"""

    def __init__(self, servicio_email: "ServicioEmail"):
        self._email = servicio_email

    def al_confirmar_pedido(self, evento: PedidoConfirmado) -> None:
        self._email.enviar(
            destinatario=self._obtener_email_cliente(evento.cliente_id),
            asunto=f"Pedido {evento.pedido_id} confirmado",
            plantilla="pedido_confirmado",
            datos={
                "pedido_id": str(evento.pedido_id),
                "total": str(evento.total),
                "productos": evento.cantidad_productos,
            },
        )

    def al_cancelar_pedido(self, evento: PedidoCancelado) -> None:
        self._email.enviar(
            destinatario=self._obtener_email_cliente_por_pedido(evento.pedido_id),
            asunto=f"Pedido {evento.pedido_id} cancelado",
            plantilla="pedido_cancelado",
            datos={"motivo": evento.motivo},
        )

class ActualizadorInventario:
    """Handler que actualiza inventario al confirmar pedidos"""

    def __init__(self, repo_inventario: "RepositorioInventario"):
        self._repo = repo_inventario

    def al_confirmar_pedido(self, evento: PedidoConfirmado) -> None:
        # Reservar stock para los productos del pedido
        pedido = self._obtener_pedido(evento.pedido_id)
        for linea in pedido.obtener_lineas():
            self._repo.reservar_stock(
                producto_id=linea.producto_id,
                cantidad=linea.cantidad,
                referencia=f"PEDIDO-{evento.pedido_id}",
            )

    def al_cancelar_pedido(self, evento: PedidoCancelado) -> None:
        # Liberar stock reservado
        self._repo.liberar_reserva(
            referencia=f"PEDIDO-{evento.pedido_id}"
        )

class GeneradorFacturas:
    """Handler que genera facturas al recibir pagos"""

    def __init__(self, servicio_facturacion: "ServicioFacturacion"):
        self._facturacion = servicio_facturacion

    def al_recibir_pago(self, evento: PagoRecibido) -> None:
        self._facturacion.generar_factura(
            pedido_id=evento.pedido_id,
            monto=evento.monto,
            referencia_pago=evento.referencia_transaccion,
        )

Integración en el Caso de Uso

class ConfirmarPedidoHandler:
    def __init__(
        self,
        uow: "UnitOfWork",
        despachador: DespachadorEventos
    ):
        self._uow = uow
        self._despachador = despachador

    def ejecutar(self, comando: "ConfirmarPedidoCommand") -> None:
        with self._uow:
            pedido = self._uow.pedidos.obtener(comando.pedido_id)
            if not pedido:
                raise PedidoNoEncontrado(comando.pedido_id)

            # Ejecutar lógica de negocio (genera eventos)
            pedido.confirmar()

            # Persistir cambios
            self._uow.pedidos.guardar(pedido)
            self._uow.commit()

            # Despachar eventos DESPUÉS del commit
            eventos = pedido.obtener_y_limpiar_eventos()
            self._despachador.despachar_todos(eventos)

Configuración del Sistema

def configurar_handlers(despachador: DespachadorEventos) -> None:
    """Configura todos los handlers de eventos"""

    # Servicios de infraestructura
    servicio_email = ServicioEmailSMTP(config.smtp)
    repo_inventario = RepositorioInventarioSQL(session)
    servicio_facturacion = ServicioFacturacionAPI(config.api_facturacion)

    # Handlers
    notificador = NotificadorPedidos(servicio_email)
    inventario = ActualizadorInventario(repo_inventario)
    facturador = GeneradorFacturas(servicio_facturacion)

    # Registrar handlers
    despachador.registrar(PedidoConfirmado, notificador.al_confirmar_pedido)
    despachador.registrar(PedidoConfirmado, inventario.al_confirmar_pedido)

    despachador.registrar(PedidoCancelado, notificador.al_cancelar_pedido)
    despachador.registrar(PedidoCancelado, inventario.al_cancelar_pedido)

    despachador.registrar(PagoRecibido, facturador.al_recibir_pago)

# En el bootstrap de la aplicación
despachador = DespachadorEventos()
configurar_handlers(despachador)

Event Store (Persistencia de Eventos)

from dataclasses import dataclass
from datetime import datetime

@dataclass
class EventoAlmacenado:
    id: int
    agregado_id: UUID
    tipo_agregado: str
    tipo_evento: str
    datos: dict
    version: int
    timestamp: datetime

class EventStore:
    """Almacena eventos para auditoría y event sourcing"""

    def __init__(self, session: "Session"):
        self._session = session

    def guardar(self, agregado_id: UUID, eventos: list[DomainEvent]) -> None:
        for evento in eventos:
            registro = EventoAlmacenado(
                agregado_id=agregado_id,
                tipo_agregado=self._tipo_agregado(agregado_id),
                tipo_evento=evento.nombre_evento,
                datos=evento.to_dict(),
                version=evento.version,
                timestamp=evento.ocurrido_en,
            )
            self._session.add(registro)
        self._session.flush()

    def obtener_eventos(
        self, agregado_id: UUID, desde_version: int = 0
    ) -> list[EventoAlmacenado]:
        return (
            self._session.query(EventoAlmacenado)
            .filter(EventoAlmacenado.agregado_id == agregado_id)
            .filter(EventoAlmacenado.version > desde_version)
            .order_by(EventoAlmacenado.version)
            .all()
        )

Beneficios de Domain Events

┌─────────────────────────────────────────────────────────────┐
│                   BENEFICIOS                                 │
├─────────────────────────────────────────────────────────────┤
│                                                              │
│  1. DESACOPLAMIENTO                                         │
│     Agregado ──▶ Evento ──▶ Handler                         │
│     El agregado no conoce quién escucha                     │
│                                                              │
│  2. EXTENSIBILIDAD                                          │
│     Agregar nuevos handlers sin modificar el dominio        │
│     Nuevo requisito = Nuevo handler                         │
│                                                              │
│  3. AUDITORÍA                                               │
│     Historial completo de hechos del sistema                │
│     Qué pasó, cuándo, con qué datos                         │
│                                                              │
│  4. INTEGRACIÓN                                             │
│     Comunicar entre bounded contexts                        │
│     Publicar a sistemas externos                            │
│                                                              │
│  5. EVENT SOURCING                                          │
│     Reconstruir estado desde eventos                        │
│     Proyecciones para diferentes vistas                     │
│                                                              │
└─────────────────────────────────────────────────────────────┘

Checklist de Domain Events