Capítulo 14: Event Sourcing en Python
Capítulo 14: Event Sourcing en Python
“Python combina expresividad con productividad para Event Sourcing”
Por Qué Python para Event Sourcing
Python es un lenguaje interpretado conocido por su legibilidad y ecosistema maduro. Para Event Sourcing ofrece:
- Dataclasses: Sintaxis elegante para definir eventos inmutables con validación
- Type hints: Tipado opcional que mejora el tooling y la documentación
- Async/await: Soporte nativo para operaciones asíncronas (I/O con bases de datos)
- FastAPI: Framework moderno que genera documentación OpenAPI automáticamente
En este capítulo configuraremos un proyecto Python moderno usando las últimas herramientas del ecosistema.
Setup del Proyecto
# Crear proyecto con uv (gestor de paquetes moderno para Python)
# uv es significativamente más rápido que pip y poetry
mkdir orderflow-es-python && cd orderflow-es-python
uv init
# Instalar dependencias de producción
# fastapi: framework web asíncrono de alto rendimiento
# uvicorn: servidor ASGI para ejecutar FastAPI
# pydantic: validación de datos con type hints
# sqlalchemy: ORM para interactuar con bases de datos
# asyncpg: driver PostgreSQL asíncrono de alto rendimiento
uv add fastapi uvicorn pydantic sqlalchemy asyncpg
# Instalar dependencias de desarrollo
# pytest: framework de testing
# pytest-asyncio: soporte para tests async
# ruff: linter y formatter ultra-rápido
# mypy: verificador de tipos estático
uv add --dev pytest pytest-asyncio pytest-cov ruff mypy
Estructura del Proyecto
La estructura sigue el patrón de clean architecture con separación clara entre capas:
orderflow-es-python/
├── src/ # Código fuente principal
│ └── orderflow/ # Paquete principal
│ ├── __init__.py
│ ├── domain/
│ │ ├── __init__.py
│ │ ├── events/
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ └── order_events.py
│ │ ├── aggregates/
│ │ │ ├── __init__.py
│ │ │ └── order.py
│ │ └── value_objects/
│ │ ├── __init__.py
│ │ ├── address.py
│ │ └── money.py
│ ├── application/
│ │ ├── __init__.py
│ │ ├── commands/
│ │ │ └── create_order.py
│ │ ├── queries/
│ │ │ └── get_order.py
│ │ └── projections/
│ │ └── orders_projection.py
│ ├── infrastructure/
│ │ ├── __init__.py
│ │ ├── event_store/
│ │ │ ├── __init__.py
│ │ │ ├── interface.py
│ │ │ ├── postgres.py
│ │ │ └── inmemory.py
│ │ └── repository/
│ │ └── order_repository.py
│ └── api/
│ ├── __init__.py
│ ├── main.py
│ └── routes/
│ └── orders.py
├── tests/
│ ├── __init__.py
│ ├── domain/
│ │ └── test_order.py
│ └── infrastructure/
│ └── test_event_store.py
├── pyproject.toml
└── docker-compose.yml
Configuración pyproject.toml
[project]
name = "orderflow-es-python"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.109.0",
"uvicorn>=0.27.0",
"pydantic>=2.5.0",
"sqlalchemy>=2.0.0",
"asyncpg>=0.29.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.23.0",
"pytest-cov>=4.1.0",
"ruff>=0.1.0",
"mypy>=1.8.0",
]
[tool.ruff]
line-length = 88
target-version = "py312"
[tool.mypy]
python_version = "3.12"
strict = true
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
Eventos Base
En Python usamos dataclasses con el decorador frozen=True para crear objetos inmutables. La inmutabilidad es crucial en Event Sourcing: un evento representa un hecho histórico que nunca debe modificarse.
El patrón Generic[T] permite tipar el payload del evento, mejorando el soporte de IDE y detectando errores en tiempo de desarrollo.
# src/orderflow/domain/events/base.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, TypeVar, Generic
from uuid import uuid4
@dataclass(frozen=True)
class EventMetadata:
correlation_id: str = field(default_factory=lambda: str(uuid4()))
causation_id: str = field(default_factory=lambda: str(uuid4()))
user_id: str | None = None
timestamp: datetime = field(default_factory=datetime.utcnow)
T = TypeVar("T")
@dataclass(frozen=True)
class DomainEvent(Generic[T]):
event_id: str
event_type: str
aggregate_id: str
aggregate_type: str
version: int
payload: T
metadata: EventMetadata
@classmethod
def create(
cls,
event_type: str,
aggregate_id: str,
aggregate_type: str,
version: int,
payload: T,
metadata: EventMetadata | None = None,
) -> "DomainEvent[T]":
return cls(
event_id=str(uuid4()),
event_type=event_type,
aggregate_id=aggregate_id,
aggregate_type=aggregate_type,
version=version,
payload=payload,
metadata=metadata or EventMetadata(),
)
Value Objects
Los Value Objects se definen como dataclasses inmutables (frozen=True). El método __post_init__ se ejecuta automáticamente después del constructor, permitiendo validación sin código repetitivo.
# src/orderflow/domain/value_objects/money.py
from dataclasses import dataclass
@dataclass(frozen=True)
class Money:
amount: float
currency: str = "USD"
def __post_init__(self):
if self.amount < 0:
raise ValueError("Amount cannot be negative")
if len(self.currency) != 3:
raise ValueError("Currency must be 3 characters")
def add(self, other: "Money") -> "Money":
if self.currency != other.currency:
raise ValueError("Cannot add different currencies")
return Money(self.amount + other.amount, self.currency)
def multiply(self, factor: float) -> "Money":
return Money(self.amount * factor, self.currency)
# src/orderflow/domain/value_objects/address.py
from dataclasses import dataclass
@dataclass(frozen=True)
class Address:
street: str
city: str
state: str
zip_code: str
country: str
def __post_init__(self):
if not all([self.street, self.city, self.state, self.zip_code]):
raise ValueError("All address fields are required")
Eventos de Order
# src/orderflow/domain/events/order_events.py
from dataclasses import dataclass
from datetime import datetime
from ..value_objects.money import Money
from ..value_objects.address import Address
from .base import DomainEvent, EventMetadata
@dataclass(frozen=True)
class OrderItem:
product_id: str
product_name: str
sku: str
quantity: int
unit_price: Money
@dataclass(frozen=True)
class OrderCreatedPayload:
customer_id: str
customer_email: str
items: tuple[OrderItem, ...]
shipping_address: Address
subtotal: Money
tax: Money
total: Money
@dataclass(frozen=True)
class OrderItemAddedPayload:
item: OrderItem
new_subtotal: Money
new_total: Money
@dataclass(frozen=True)
class OrderConfirmedPayload:
confirmed_at: datetime
estimated_delivery: datetime
@dataclass(frozen=True)
class PaymentReceivedPayload:
payment_id: str
amount: Money
method: str
transaction_id: str
paid_at: datetime
@dataclass(frozen=True)
class OrderShippedPayload:
tracking_number: str
carrier: str
shipped_at: datetime
estimated_delivery: datetime
@dataclass(frozen=True)
class OrderCancelledPayload:
reason: str
cancelled_by: str
cancelled_at: datetime
refund_required: bool
# Type aliases
OrderCreated = DomainEvent[OrderCreatedPayload]
OrderItemAdded = DomainEvent[OrderItemAddedPayload]
OrderConfirmed = DomainEvent[OrderConfirmedPayload]
PaymentReceived = DomainEvent[PaymentReceivedPayload]
OrderShipped = DomainEvent[OrderShippedPayload]
OrderCancelled = DomainEvent[OrderCancelledPayload]
# Factory functions
def create_order_created(
order_id: str,
version: int,
payload: OrderCreatedPayload,
) -> OrderCreated:
return DomainEvent.create(
event_type="OrderCreated",
aggregate_id=order_id,
aggregate_type="Order",
version=version,
payload=payload,
)
Event Store Interface
Python usa ABC (Abstract Base Class) para definir interfaces. El decorador @abstractmethod marca métodos que las subclases deben implementar obligatoriamente.
Las operaciones son async porque interactúan con I/O (base de datos), permitiendo que otras tareas se ejecuten mientras esperamos respuestas.
# src/orderflow/infrastructure/event_store/interface.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any
@dataclass
class StoredEvent:
global_position: int
stream_id: str
stream_position: int
event_type: str
data: dict[str, Any]
metadata: dict[str, Any]
created_at: datetime
@dataclass
class AppendResult:
next_expected_version: int
global_position: int
events_appended: int
class ConcurrencyError(Exception):
def __init__(self, stream_id: str, expected: int, actual: int):
self.stream_id = stream_id
self.expected_version = expected
self.actual_version = actual
super().__init__(
f"Concurrency conflict on {stream_id}: "
f"expected {expected}, actual {actual}"
)
class EventStore(ABC):
@abstractmethod
async def append(
self,
stream_id: str,
events: list,
expected_version: int,
) -> AppendResult:
pass
@abstractmethod
async def read_stream(
self,
stream_id: str,
from_version: int = 0,
) -> list[StoredEvent]:
pass
@abstractmethod
async def read_all(
self,
from_position: int = 0,
limit: int = 1000,
) -> list[StoredEvent]:
pass
@abstractmethod
async def get_stream_version(self, stream_id: str) -> int:
pass
API con FastAPI
FastAPI es un framework web moderno que combina alto rendimiento (basado en Starlette) con generación automática de documentación OpenAPI. El lifespan context manager gestiona el ciclo de vida de la aplicación (startup/shutdown).
# src/orderflow/api/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
print("Starting up...")
yield
# Shutdown
print("Shutting down...")
app = FastAPI(
title="OrderFlow Event Sourcing",
version="1.0.0",
lifespan=lifespan,
)
@app.get("/health")
async def health():
return {"status": "ok"}
Docker Compose
# docker-compose.yml
services:
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: orderflow
POSTGRES_PASSWORD: orderflow
POSTGRES_DB: orderflow
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
api:
build: .
ports:
- "8000:8000"
environment:
DATABASE_URL: postgres://orderflow:orderflow@postgres:5432/orderflow
depends_on:
- postgres
volumes:
postgres_data:
Ejecutar el Proyecto
# Desarrollo
uv run uvicorn src.orderflow.api.main:app --reload
# Tests
uv run pytest
# Type checking
uv run mypy src/
# Linting
uv run ruff check src/
Resumen
- Python 3.12+ con dataclasses inmutables
- FastAPI para API REST asíncrona
- Pydantic para validación de datos
- Estructura clean architecture
- uv como package manager moderno
Glosario
uv (Package Manager)
Definición: Gestor de paquetes y entornos virtuales para Python, escrito en Rust, que reemplaza pip, pip-tools, y virtualenv.
Por qué es importante: Es 10-100x más rápido que pip, maneja resolución de dependencias de forma correcta, y simplifica la gestión de proyectos.
Ejemplo práctico: uv add fastapi instala FastAPI y todas sus dependencias, actualizando pyproject.toml y uv.lock automáticamente.
Dataclass con frozen=True
Definición: Clase de datos en Python donde las instancias son inmutables (no se pueden modificar después de crear).
Por qué es importante: Los eventos deben ser inmutables ya que representan hechos históricos. Intentar modificarlos lanza una excepción.
Ejemplo práctico: @dataclass(frozen=True) class Money crea objetos Money que no pueden cambiar; money.amount = 100 lanzaría FrozenInstanceError.
post_init
Definición: Método especial de dataclasses que se ejecuta automáticamente después del constructor generado.
Por qué es importante: Permite validar datos al crear el objeto sin escribir un __init__ manual.
Ejemplo práctico: En Money.__post_init__, validamos que amount >= 0 y currency tenga 3 caracteres, lanzando ValueError si no cumple.
TypeVar y Generic[T]
Definición: Mecanismos de Python para crear tipos genéricos que funcionan con diferentes tipos concretos manteniendo type safety.
Por qué es importante: Permite que DomainEvent[T] sea tipado correctamente: DomainEvent[OrderCreatedPayload] indica que el payload es de ese tipo específico.
Ejemplo práctico: El IDE puede autocompletar event.payload.customer_id porque sabe que el payload es OrderCreatedPayload, no un dict genérico.
ABC (Abstract Base Class)
Definición: Clase que no puede instanciarse directamente y define métodos que las subclases deben implementar.
Por qué es importante: Define contratos (interfaces) que diferentes implementaciones deben cumplir, facilitando sustitución y testing.
Ejemplo práctico: EventStore(ABC) define append() abstracto; PostgresEventStore y InMemoryEventStore deben implementarlo.
async/await
Definición: Sintaxis de Python para programación asíncrona, permitiendo que el código “espere” operaciones I/O sin bloquear el hilo.
Por qué es importante: Permite manejar miles de conexiones concurrentes con pocos recursos, ideal para APIs que consultan bases de datos.
Ejemplo práctico: await event_store.append(...) libera el hilo mientras PostgreSQL procesa; otras requests pueden atenderse mientras tanto.
FastAPI Lifespan
Definición: Context manager que gestiona el ciclo de vida de la aplicación FastAPI, ejecutando código al iniciar y al apagar.
Por qué es importante: Permite inicializar conexiones a bases de datos al arrancar y cerrarlas limpiamente al apagar.
Ejemplo práctico: El código antes del yield se ejecuta al startup (conectar a DB); el código después se ejecuta al shutdown (desconectar).
pyproject.toml
Definición: Archivo estándar (PEP 518/621) que define metadatos del proyecto Python, dependencias, y configuración de herramientas.
Por qué es importante: Centraliza toda la configuración del proyecto en un solo archivo, reemplazando setup.py, requirements.txt, y archivos de config separados.
Ejemplo práctico: Define nombre, versión, dependencias de producción y desarrollo, y configuración de ruff, mypy, y pytest en un solo lugar.
← Capítulo 13: Go Agregados | Capítulo 15: EventStoreDB con Python →