CQRS en Python
Capítulo 17: CQRS en Python
Python es un lenguaje interpretado conocido por su legibilidad y productividad. Con las características modernas de tipado (type hints), Python permite implementar CQRS de forma clara, mantenible y con verificación estática de tipos.
Por qué Python para CQRS
Python ofrece ventajas para CQRS:
- Legibilidad: Código claro que sirve como documentación
- async/await: Concurrencia nativa para I/O
- Type hints: Tipado opcional que mejora la mantenibilidad
- Ecosistema: FastAPI, SQLAlchemy, Pydantic para desarrollo rápido
Estructura del Proyecto
La estructura organiza el código por capas (domain, command, query, infrastructure) siguiendo principios de Clean Architecture.
orderflow-python/
├── src/
│ ├── domain/
│ │ ├── __init__.py
│ │ ├── order.py
│ │ └── events.py
│ ├── command/
│ │ ├── __init__.py
│ │ ├── bus.py
│ │ └── handlers/
│ │ ├── create_order.py
│ │ └── add_item.py
│ ├── query/
│ │ ├── __init__.py
│ │ ├── bus.py
│ │ └── handlers/
│ │ ├── get_order.py
│ │ └── list_orders.py
│ └── infrastructure/
│ ├── postgres/
│ ├── elasticsearch/
│ └── redis/
├── tests/
└── pyproject.toml
Interfaces Base
ABC (Abstract Base Class) permite definir interfaces que las clases concretas deben implementar. Es el equivalente de Python a las interfaces de otros lenguajes.
TypeVar y Generic permiten crear clases genéricas tipadas. C = TypeVar('C', bound=Command) crea un tipo variable que debe ser un Command o subclase.
dataclass con frozen=True crea clases inmutables: una vez creado el objeto, sus campos no pueden modificarse. Esto es importante para comandos y eventos que representan hechos inmutables.
# src/command/bus.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
from dataclasses import dataclass
@dataclass(frozen=True)
class Command(ABC):
pass
C = TypeVar('C', bound=Command)
class CommandHandler(ABC, Generic[C]):
@abstractmethod
async def handle(self, command: C) -> None:
pass
class CommandBus:
def __init__(self):
self._handlers: dict[type, CommandHandler] = {}
def register(self, command_type: type[C], handler: CommandHandler[C]) -> None:
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> None:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command).__name__}")
await handler.handle(command)
El Query Bus es similar pero permite retornar valores. El tipo R representa el tipo de retorno del handler.
# src/query/bus.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
from dataclasses import dataclass
@dataclass(frozen=True)
class Query(ABC):
pass
Q = TypeVar('Q', bound=Query)
R = TypeVar('R')
class QueryHandler(ABC, Generic[Q, R]):
@abstractmethod
async def handle(self, query: Q) -> R:
pass
class QueryBus:
def __init__(self):
self._handlers: dict[type, QueryHandler] = {}
def register(self, query_type: type[Q], handler: QueryHandler[Q, R]) -> None:
self._handlers[query_type] = handler
async def ask(self, query: Query) -> any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for {type(query).__name__}")
return await handler.handle(query)
Modelo de Dominio
El modelo de dominio usa dataclasses para definir la estructura de datos y Enum para estados con valores fijos.
El campo _events tiene repr=False para excluirlo de la representación string, y default_factory=list crea una nueva lista vacía para cada instancia.
El método de clase create() actua como factory method: crea la instancia y registra el evento de creación en un solo paso.
# src/domain/order.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List
from .events import DomainEvent, OrderCreated, ItemAdded
class OrderStatus(Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
@dataclass
class Item:
product_id: str
name: str
price: float
quantity: int
@dataclass
class Order:
id: str
customer_id: str
items: List[Item] = field(default_factory=list)
status: OrderStatus = OrderStatus.PENDING
total: float = 0.0
created_at: datetime = field(default_factory=datetime.now)
_events: List[DomainEvent] = field(default_factory=list, repr=False)
@classmethod
def create(cls, order_id: str, customer_id: str) -> "Order":
order = cls(id=order_id, customer_id=customer_id)
order._record(OrderCreated(order_id=order_id, customer_id=customer_id))
return order
def add_item(self, item: Item) -> None:
self.items.append(item)
self.total += item.price * item.quantity
self._record(ItemAdded(order_id=self.id, item=item))
def _record(self, event: DomainEvent) -> None:
self._events.append(event)
def pull_events(self) -> List[DomainEvent]:
events = self._events.copy()
self._events.clear()
return events
Eventos de Dominio
Los eventos son inmutables (frozen=True) porque representan hechos que ya ocurrieron y no deben modificarse.
default_factory=datetime.now genera automáticamente el timestamp cuando se crea el evento.
Las comillas en "Item" son una forward reference: permiten referenciar un tipo que aún no está definido en ese punto del archivo.
# src/domain/events.py
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC
@dataclass(frozen=True)
class DomainEvent(ABC):
occurred_at: datetime = field(default_factory=datetime.now)
@dataclass(frozen=True)
class OrderCreated(DomainEvent):
order_id: str
customer_id: str
@dataclass(frozen=True)
class ItemAdded(DomainEvent):
order_id: str
item: "Item"
Command Handler
El handler hereda de CommandHandler[CreateOrderCommand], indicando que maneja específicamente ese tipo de comando.
async/await marca el método como asíncrono: puede pausarse mientras espera operaciones I/O (base de datos, red) sin bloquear el hilo.
# src/command/handlers/create_order.py
from dataclasses import dataclass
from ..bus import Command, CommandHandler
from ...domain.order import Order
from ...infrastructure.repositories import OrderRepository
from ...infrastructure.events import EventPublisher
@dataclass(frozen=True)
class CreateOrderCommand(Command):
order_id: str
customer_id: str
class CreateOrderHandler(CommandHandler[CreateOrderCommand]):
def __init__(self, repo: OrderRepository, event_bus: EventPublisher):
self._repo = repo
self._event_bus = event_bus
async def handle(self, command: CreateOrderCommand) -> None:
order = Order.create(command.order_id, command.customer_id)
await self._repo.save(order)
for event in order.pull_events():
await self._event_bus.publish(event)
Query Handler
El Query Handler especifica dos tipos genéricos: GetOrderQuery (el query que maneja) y Optional[OrderReadModel] (el tipo de retorno).
Optional indica que el método puede retornar None si el pedido no existe, en lugar de lanzar una excepción.
# src/query/handlers/get_order.py
from dataclasses import dataclass
from typing import Optional
from ..bus import Query, QueryHandler
from ...infrastructure.read_repositories import OrderReadRepository
@dataclass(frozen=True)
class GetOrderQuery(Query):
order_id: str
@dataclass
class OrderReadModel:
id: str
customer_id: str
items: list
status: str
total: float
class GetOrderHandler(QueryHandler[GetOrderQuery, Optional[OrderReadModel]]):
def __init__(self, read_repo: OrderReadRepository):
self._read_repo = read_repo
async def handle(self, query: GetOrderQuery) -> Optional[OrderReadModel]:
return await self._read_repo.find_by_id(query.order_id)
Configuración de Buses
El módulo bootstrap configura la aplicación: crea los buses y registra los handlers con sus dependencias.
Este patrón centraliza la configuración, facilitando modificaciones y testing (se puede crear un bootstrap alternativo para tests).
# src/bootstrap.py
from .command.bus import CommandBus
from .command.handlers.create_order import CreateOrderCommand, CreateOrderHandler
from .query.bus import QueryBus
from .query.handlers.get_order import GetOrderQuery, GetOrderHandler
def setup_command_bus(deps) -> CommandBus:
bus = CommandBus()
bus.register(CreateOrderCommand, CreateOrderHandler(deps.order_repo, deps.event_bus))
return bus
def setup_query_bus(deps) -> QueryBus:
bus = QueryBus()
bus.register(GetOrderQuery, GetOrderHandler(deps.read_repo))
return bus
Resumen
CQRS en Python:
- Usa dataclasses para comandos, queries y eventos inmutables
- Typing genérico para handlers tipados
- Separación clara entre escritura y lectura
- Patrón bus para desacoplar handlers
Glosario
dataclass
Definición: Decorador de Python que genera automáticamente métodos especiales (__init__, __repr__, __eq__) basándose en los campos definidos como anotaciones de tipo.
Por qué es importante: Reduce boilerplate al crear clases de datos. Con frozen=True crea objetos inmutables ideales para comandos y eventos.
Ejemplo práctico: @dataclass(frozen=True) class CreateOrderCommand: order_id: str genera automáticamente el constructor y hace el objeto inmutable.
ABC (Abstract Base Class)
Definición: Clase base abstracta que define una interfaz. Las clases que heredan deben implementar todos los métodos marcados con @abstractmethod.
Por qué es importante: Permite definir contratos que las implementaciones concretas deben cumplir, similar a interfaces en otros lenguajes.
Ejemplo práctico: class CommandHandler(ABC) con @abstractmethod async def handle(self) obliga a que todo handler implemente el método handle.
Generic y TypeVar
Definición: Mecanismos de Python para crear clases y funciones genéricas que trabajan con tipos específicos determinados en tiempo de uso.
Por qué es importante: Permite crear handlers tipados donde el tipo de comando/query y retorno se especifican al heredar, mejorando la verificación estática.
Ejemplo práctico: CommandHandler[CreateOrderCommand] indica que este handler específico maneja CreateOrderCommand, no cualquier Command.
async/await
Definición: Palabras clave de Python para programación asíncrona. async def define una corutina, await pausa la ejecución hasta que una operación I/O complete.
Por qué es importante: Permite manejar miles de operaciones I/O concurrentes sin crear miles de threads, ideal para APIs con muchas conexiones.
Ejemplo práctico: await self._repo.save(order) pausa el handler mientras espera la base de datos, liberando el thread para procesar otros requests.
Enum
Definición: Clase que define un conjunto fijo de constantes con nombre. Los valores son únicos y no pueden modificarse.
Por qué es importante: Previene errores de tipeo y hace el código más legible. OrderStatus.PENDING es más claro y seguro que "pending".
Ejemplo práctico: class OrderStatus(Enum): PENDING = "pending" define estados válidos. Usar un valor inválido genera error.
Optional
Definición: Tipo que indica que un valor puede ser del tipo especificado o None. Optional[X] es equivalente a X | None.
Por qué es importante: Documenta explícitamente que una función puede retornar None, forzando al llamador a manejar ese caso.
Ejemplo práctico: -> Optional[OrderReadModel] indica que find_by_id puede retornar None si el pedido no existe.
Forward Reference
Definición: Referencia a un tipo usando un string en lugar del tipo directamente, necesario cuando el tipo aún no está definido en ese punto del código.
Por qué es importante: Permite referencias circulares entre tipos y referencias a tipos definidos más adelante en el archivo.
Ejemplo práctico: item: "Item" en el evento ItemAdded referencia la clase Item que está en otro módulo o definida después.
Factory Method
Definición: Patrón de diseño donde un método estático o de clase crea instancias de la clase, encapsulando la lógica de creación.
Por qué es importante: Permite crear objetos con lógica adicional (validación, registro de eventos) en un solo paso, garantizando consistencia.
Ejemplo práctico: Order.create(order_id, customer_id) crea el Order Y registra el evento OrderCreated, evitando que se olvide el evento.
pyproject.toml
Definición: Archivo de configuración estándar para proyectos Python que define metadata, dependencias y configuración de herramientas.
Por qué es importante: Centraliza la configuración del proyecto en un formato estándar, reemplazando setup.py, requirements.txt y múltiples archivos de config.
Ejemplo práctico: Define dependencias como fastapi = "^0.100", scripts como dev = "uvicorn main:app", y configuración de pytest, black, etc.