← Volver al listado de tecnologías

CQRS en Python

Por: SiempreListo
cqrspythonarquitecturatyping

Capítulo 17: CQRS en Python

Python es un lenguaje interpretado conocido por su legibilidad y productividad. Con las características modernas de tipado (type hints), Python permite implementar CQRS de forma clara, mantenible y con verificación estática de tipos.

Por qué Python para CQRS

Python ofrece ventajas para CQRS:

Estructura del Proyecto

La estructura organiza el código por capas (domain, command, query, infrastructure) siguiendo principios de Clean Architecture.

orderflow-python/
├── src/
│   ├── domain/
│   │   ├── __init__.py
│   │   ├── order.py
│   │   └── events.py
│   ├── command/
│   │   ├── __init__.py
│   │   ├── bus.py
│   │   └── handlers/
│   │       ├── create_order.py
│   │       └── add_item.py
│   ├── query/
│   │   ├── __init__.py
│   │   ├── bus.py
│   │   └── handlers/
│   │       ├── get_order.py
│   │       └── list_orders.py
│   └── infrastructure/
│       ├── postgres/
│       ├── elasticsearch/
│       └── redis/
├── tests/
└── pyproject.toml

Interfaces Base

ABC (Abstract Base Class) permite definir interfaces que las clases concretas deben implementar. Es el equivalente de Python a las interfaces de otros lenguajes.

TypeVar y Generic permiten crear clases genéricas tipadas. C = TypeVar('C', bound=Command) crea un tipo variable que debe ser un Command o subclase.

dataclass con frozen=True crea clases inmutables: una vez creado el objeto, sus campos no pueden modificarse. Esto es importante para comandos y eventos que representan hechos inmutables.

# src/command/bus.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
from dataclasses import dataclass

@dataclass(frozen=True)
class Command(ABC):
    pass

C = TypeVar('C', bound=Command)

class CommandHandler(ABC, Generic[C]):
    @abstractmethod
    async def handle(self, command: C) -> None:
        pass

class CommandBus:
    def __init__(self):
        self._handlers: dict[type, CommandHandler] = {}

    def register(self, command_type: type[C], handler: CommandHandler[C]) -> None:
        self._handlers[command_type] = handler

    async def dispatch(self, command: Command) -> None:
        handler = self._handlers.get(type(command))
        if not handler:
            raise ValueError(f"No handler for {type(command).__name__}")
        await handler.handle(command)

El Query Bus es similar pero permite retornar valores. El tipo R representa el tipo de retorno del handler.

# src/query/bus.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
from dataclasses import dataclass

@dataclass(frozen=True)
class Query(ABC):
    pass

Q = TypeVar('Q', bound=Query)
R = TypeVar('R')

class QueryHandler(ABC, Generic[Q, R]):
    @abstractmethod
    async def handle(self, query: Q) -> R:
        pass

class QueryBus:
    def __init__(self):
        self._handlers: dict[type, QueryHandler] = {}

    def register(self, query_type: type[Q], handler: QueryHandler[Q, R]) -> None:
        self._handlers[query_type] = handler

    async def ask(self, query: Query) -> any:
        handler = self._handlers.get(type(query))
        if not handler:
            raise ValueError(f"No handler for {type(query).__name__}")
        return await handler.handle(query)

Modelo de Dominio

El modelo de dominio usa dataclasses para definir la estructura de datos y Enum para estados con valores fijos.

El campo _events tiene repr=False para excluirlo de la representación string, y default_factory=list crea una nueva lista vacía para cada instancia.

El método de clase create() actua como factory method: crea la instancia y registra el evento de creación en un solo paso.

# src/domain/order.py
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import List
from .events import DomainEvent, OrderCreated, ItemAdded

class OrderStatus(Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    SHIPPED = "shipped"

@dataclass
class Item:
    product_id: str
    name: str
    price: float
    quantity: int

@dataclass
class Order:
    id: str
    customer_id: str
    items: List[Item] = field(default_factory=list)
    status: OrderStatus = OrderStatus.PENDING
    total: float = 0.0
    created_at: datetime = field(default_factory=datetime.now)
    _events: List[DomainEvent] = field(default_factory=list, repr=False)

    @classmethod
    def create(cls, order_id: str, customer_id: str) -> "Order":
        order = cls(id=order_id, customer_id=customer_id)
        order._record(OrderCreated(order_id=order_id, customer_id=customer_id))
        return order

    def add_item(self, item: Item) -> None:
        self.items.append(item)
        self.total += item.price * item.quantity
        self._record(ItemAdded(order_id=self.id, item=item))

    def _record(self, event: DomainEvent) -> None:
        self._events.append(event)

    def pull_events(self) -> List[DomainEvent]:
        events = self._events.copy()
        self._events.clear()
        return events

Eventos de Dominio

Los eventos son inmutables (frozen=True) porque representan hechos que ya ocurrieron y no deben modificarse.

default_factory=datetime.now genera automáticamente el timestamp cuando se crea el evento.

Las comillas en "Item" son una forward reference: permiten referenciar un tipo que aún no está definido en ese punto del archivo.

# src/domain/events.py
from dataclasses import dataclass, field
from datetime import datetime
from abc import ABC

@dataclass(frozen=True)
class DomainEvent(ABC):
    occurred_at: datetime = field(default_factory=datetime.now)

@dataclass(frozen=True)
class OrderCreated(DomainEvent):
    order_id: str
    customer_id: str

@dataclass(frozen=True)
class ItemAdded(DomainEvent):
    order_id: str
    item: "Item"

Command Handler

El handler hereda de CommandHandler[CreateOrderCommand], indicando que maneja específicamente ese tipo de comando.

async/await marca el método como asíncrono: puede pausarse mientras espera operaciones I/O (base de datos, red) sin bloquear el hilo.

# src/command/handlers/create_order.py
from dataclasses import dataclass
from ..bus import Command, CommandHandler
from ...domain.order import Order
from ...infrastructure.repositories import OrderRepository
from ...infrastructure.events import EventPublisher

@dataclass(frozen=True)
class CreateOrderCommand(Command):
    order_id: str
    customer_id: str

class CreateOrderHandler(CommandHandler[CreateOrderCommand]):
    def __init__(self, repo: OrderRepository, event_bus: EventPublisher):
        self._repo = repo
        self._event_bus = event_bus

    async def handle(self, command: CreateOrderCommand) -> None:
        order = Order.create(command.order_id, command.customer_id)
        await self._repo.save(order)

        for event in order.pull_events():
            await self._event_bus.publish(event)

Query Handler

El Query Handler especifica dos tipos genéricos: GetOrderQuery (el query que maneja) y Optional[OrderReadModel] (el tipo de retorno).

Optional indica que el método puede retornar None si el pedido no existe, en lugar de lanzar una excepción.

# src/query/handlers/get_order.py
from dataclasses import dataclass
from typing import Optional
from ..bus import Query, QueryHandler
from ...infrastructure.read_repositories import OrderReadRepository

@dataclass(frozen=True)
class GetOrderQuery(Query):
    order_id: str

@dataclass
class OrderReadModel:
    id: str
    customer_id: str
    items: list
    status: str
    total: float

class GetOrderHandler(QueryHandler[GetOrderQuery, Optional[OrderReadModel]]):
    def __init__(self, read_repo: OrderReadRepository):
        self._read_repo = read_repo

    async def handle(self, query: GetOrderQuery) -> Optional[OrderReadModel]:
        return await self._read_repo.find_by_id(query.order_id)

Configuración de Buses

El módulo bootstrap configura la aplicación: crea los buses y registra los handlers con sus dependencias.

Este patrón centraliza la configuración, facilitando modificaciones y testing (se puede crear un bootstrap alternativo para tests).

# src/bootstrap.py
from .command.bus import CommandBus
from .command.handlers.create_order import CreateOrderCommand, CreateOrderHandler
from .query.bus import QueryBus
from .query.handlers.get_order import GetOrderQuery, GetOrderHandler

def setup_command_bus(deps) -> CommandBus:
    bus = CommandBus()
    bus.register(CreateOrderCommand, CreateOrderHandler(deps.order_repo, deps.event_bus))
    return bus

def setup_query_bus(deps) -> QueryBus:
    bus = QueryBus()
    bus.register(GetOrderQuery, GetOrderHandler(deps.read_repo))
    return bus

Resumen

CQRS en Python:

Glosario

dataclass

Definición: Decorador de Python que genera automáticamente métodos especiales (__init__, __repr__, __eq__) basándose en los campos definidos como anotaciones de tipo.

Por qué es importante: Reduce boilerplate al crear clases de datos. Con frozen=True crea objetos inmutables ideales para comandos y eventos.

Ejemplo práctico: @dataclass(frozen=True) class CreateOrderCommand: order_id: str genera automáticamente el constructor y hace el objeto inmutable.


ABC (Abstract Base Class)

Definición: Clase base abstracta que define una interfaz. Las clases que heredan deben implementar todos los métodos marcados con @abstractmethod.

Por qué es importante: Permite definir contratos que las implementaciones concretas deben cumplir, similar a interfaces en otros lenguajes.

Ejemplo práctico: class CommandHandler(ABC) con @abstractmethod async def handle(self) obliga a que todo handler implemente el método handle.


Generic y TypeVar

Definición: Mecanismos de Python para crear clases y funciones genéricas que trabajan con tipos específicos determinados en tiempo de uso.

Por qué es importante: Permite crear handlers tipados donde el tipo de comando/query y retorno se especifican al heredar, mejorando la verificación estática.

Ejemplo práctico: CommandHandler[CreateOrderCommand] indica que este handler específico maneja CreateOrderCommand, no cualquier Command.


async/await

Definición: Palabras clave de Python para programación asíncrona. async def define una corutina, await pausa la ejecución hasta que una operación I/O complete.

Por qué es importante: Permite manejar miles de operaciones I/O concurrentes sin crear miles de threads, ideal para APIs con muchas conexiones.

Ejemplo práctico: await self._repo.save(order) pausa el handler mientras espera la base de datos, liberando el thread para procesar otros requests.


Enum

Definición: Clase que define un conjunto fijo de constantes con nombre. Los valores son únicos y no pueden modificarse.

Por qué es importante: Previene errores de tipeo y hace el código más legible. OrderStatus.PENDING es más claro y seguro que "pending".

Ejemplo práctico: class OrderStatus(Enum): PENDING = "pending" define estados válidos. Usar un valor inválido genera error.


Optional

Definición: Tipo que indica que un valor puede ser del tipo especificado o None. Optional[X] es equivalente a X | None.

Por qué es importante: Documenta explícitamente que una función puede retornar None, forzando al llamador a manejar ese caso.

Ejemplo práctico: -> Optional[OrderReadModel] indica que find_by_id puede retornar None si el pedido no existe.


Forward Reference

Definición: Referencia a un tipo usando un string en lugar del tipo directamente, necesario cuando el tipo aún no está definido en ese punto del código.

Por qué es importante: Permite referencias circulares entre tipos y referencias a tipos definidos más adelante en el archivo.

Ejemplo práctico: item: "Item" en el evento ItemAdded referencia la clase Item que está en otro módulo o definida después.


Factory Method

Definición: Patrón de diseño donde un método estático o de clase crea instancias de la clase, encapsulando la lógica de creación.

Por qué es importante: Permite crear objetos con lógica adicional (validación, registro de eventos) en un solo paso, garantizando consistencia.

Ejemplo práctico: Order.create(order_id, customer_id) crea el Order Y registra el evento OrderCreated, evitando que se olvide el evento.


pyproject.toml

Definición: Archivo de configuración estándar para proyectos Python que define metadata, dependencias y configuración de herramientas.

Por qué es importante: Centraliza la configuración del proyecto en un formato estándar, reemplazando setup.py, requirements.txt y múltiples archivos de config.

Ejemplo práctico: Define dependencias como fastapi = "^0.100", scripts como dev = "uvicorn main:app", y configuración de pytest, black, etc.


← Capítulo 16: Go Handlers | Capítulo 18: FastAPI y CQRS →