← Volver al listado de tecnologías

Capítulo 14: Event Sourcing en Python

Por: SiempreListo
event-sourcingpythonfastapisetup

Capítulo 14: Event Sourcing en Python

“Python combina expresividad con productividad para Event Sourcing”

Por Qué Python para Event Sourcing

Python es un lenguaje interpretado conocido por su legibilidad y ecosistema maduro. Para Event Sourcing ofrece:

En este capítulo configuraremos un proyecto Python moderno usando las últimas herramientas del ecosistema.

Setup del Proyecto

# Crear proyecto con uv (gestor de paquetes moderno para Python)
# uv es significativamente más rápido que pip y poetry
mkdir orderflow-es-python && cd orderflow-es-python
uv init

# Instalar dependencias de producción
# fastapi: framework web asíncrono de alto rendimiento
# uvicorn: servidor ASGI para ejecutar FastAPI
# pydantic: validación de datos con type hints
# sqlalchemy: ORM para interactuar con bases de datos
# asyncpg: driver PostgreSQL asíncrono de alto rendimiento
uv add fastapi uvicorn pydantic sqlalchemy asyncpg

# Instalar dependencias de desarrollo
# pytest: framework de testing
# pytest-asyncio: soporte para tests async
# ruff: linter y formatter ultra-rápido
# mypy: verificador de tipos estático
uv add --dev pytest pytest-asyncio pytest-cov ruff mypy

Estructura del Proyecto

La estructura sigue el patrón de clean architecture con separación clara entre capas:

orderflow-es-python/
├── src/                          # Código fuente principal
│   └── orderflow/                # Paquete principal
│       ├── __init__.py
│       ├── domain/
│       │   ├── __init__.py
│       │   ├── events/
│       │   │   ├── __init__.py
│       │   │   ├── base.py
│       │   │   └── order_events.py
│       │   ├── aggregates/
│       │   │   ├── __init__.py
│       │   │   └── order.py
│       │   └── value_objects/
│       │       ├── __init__.py
│       │       ├── address.py
│       │       └── money.py
│       ├── application/
│       │   ├── __init__.py
│       │   ├── commands/
│       │   │   └── create_order.py
│       │   ├── queries/
│       │   │   └── get_order.py
│       │   └── projections/
│       │       └── orders_projection.py
│       ├── infrastructure/
│       │   ├── __init__.py
│       │   ├── event_store/
│       │   │   ├── __init__.py
│       │   │   ├── interface.py
│       │   │   ├── postgres.py
│       │   │   └── inmemory.py
│       │   └── repository/
│       │       └── order_repository.py
│       └── api/
│           ├── __init__.py
│           ├── main.py
│           └── routes/
│               └── orders.py
├── tests/
│   ├── __init__.py
│   ├── domain/
│   │   └── test_order.py
│   └── infrastructure/
│       └── test_event_store.py
├── pyproject.toml
└── docker-compose.yml

Configuración pyproject.toml

[project]
name = "orderflow-es-python"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
    "fastapi>=0.109.0",
    "uvicorn>=0.27.0",
    "pydantic>=2.5.0",
    "sqlalchemy>=2.0.0",
    "asyncpg>=0.29.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0.0",
    "pytest-asyncio>=0.23.0",
    "pytest-cov>=4.1.0",
    "ruff>=0.1.0",
    "mypy>=1.8.0",
]

[tool.ruff]
line-length = 88
target-version = "py312"

[tool.mypy]
python_version = "3.12"
strict = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

Eventos Base

En Python usamos dataclasses con el decorador frozen=True para crear objetos inmutables. La inmutabilidad es crucial en Event Sourcing: un evento representa un hecho histórico que nunca debe modificarse.

El patrón Generic[T] permite tipar el payload del evento, mejorando el soporte de IDE y detectando errores en tiempo de desarrollo.

# src/orderflow/domain/events/base.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, TypeVar, Generic
from uuid import uuid4

@dataclass(frozen=True)
class EventMetadata:
    correlation_id: str = field(default_factory=lambda: str(uuid4()))
    causation_id: str = field(default_factory=lambda: str(uuid4()))
    user_id: str | None = None
    timestamp: datetime = field(default_factory=datetime.utcnow)

T = TypeVar("T")

@dataclass(frozen=True)
class DomainEvent(Generic[T]):
    event_id: str
    event_type: str
    aggregate_id: str
    aggregate_type: str
    version: int
    payload: T
    metadata: EventMetadata

    @classmethod
    def create(
        cls,
        event_type: str,
        aggregate_id: str,
        aggregate_type: str,
        version: int,
        payload: T,
        metadata: EventMetadata | None = None,
    ) -> "DomainEvent[T]":
        return cls(
            event_id=str(uuid4()),
            event_type=event_type,
            aggregate_id=aggregate_id,
            aggregate_type=aggregate_type,
            version=version,
            payload=payload,
            metadata=metadata or EventMetadata(),
        )

Value Objects

Los Value Objects se definen como dataclasses inmutables (frozen=True). El método __post_init__ se ejecuta automáticamente después del constructor, permitiendo validación sin código repetitivo.

# src/orderflow/domain/value_objects/money.py
from dataclasses import dataclass

@dataclass(frozen=True)
class Money:
    amount: float
    currency: str = "USD"

    def __post_init__(self):
        if self.amount < 0:
            raise ValueError("Amount cannot be negative")
        if len(self.currency) != 3:
            raise ValueError("Currency must be 3 characters")

    def add(self, other: "Money") -> "Money":
        if self.currency != other.currency:
            raise ValueError("Cannot add different currencies")
        return Money(self.amount + other.amount, self.currency)

    def multiply(self, factor: float) -> "Money":
        return Money(self.amount * factor, self.currency)


# src/orderflow/domain/value_objects/address.py
from dataclasses import dataclass

@dataclass(frozen=True)
class Address:
    street: str
    city: str
    state: str
    zip_code: str
    country: str

    def __post_init__(self):
        if not all([self.street, self.city, self.state, self.zip_code]):
            raise ValueError("All address fields are required")

Eventos de Order

# src/orderflow/domain/events/order_events.py
from dataclasses import dataclass
from datetime import datetime

from ..value_objects.money import Money
from ..value_objects.address import Address
from .base import DomainEvent, EventMetadata

@dataclass(frozen=True)
class OrderItem:
    product_id: str
    product_name: str
    sku: str
    quantity: int
    unit_price: Money

@dataclass(frozen=True)
class OrderCreatedPayload:
    customer_id: str
    customer_email: str
    items: tuple[OrderItem, ...]
    shipping_address: Address
    subtotal: Money
    tax: Money
    total: Money

@dataclass(frozen=True)
class OrderItemAddedPayload:
    item: OrderItem
    new_subtotal: Money
    new_total: Money

@dataclass(frozen=True)
class OrderConfirmedPayload:
    confirmed_at: datetime
    estimated_delivery: datetime

@dataclass(frozen=True)
class PaymentReceivedPayload:
    payment_id: str
    amount: Money
    method: str
    transaction_id: str
    paid_at: datetime

@dataclass(frozen=True)
class OrderShippedPayload:
    tracking_number: str
    carrier: str
    shipped_at: datetime
    estimated_delivery: datetime

@dataclass(frozen=True)
class OrderCancelledPayload:
    reason: str
    cancelled_by: str
    cancelled_at: datetime
    refund_required: bool

# Type aliases
OrderCreated = DomainEvent[OrderCreatedPayload]
OrderItemAdded = DomainEvent[OrderItemAddedPayload]
OrderConfirmed = DomainEvent[OrderConfirmedPayload]
PaymentReceived = DomainEvent[PaymentReceivedPayload]
OrderShipped = DomainEvent[OrderShippedPayload]
OrderCancelled = DomainEvent[OrderCancelledPayload]

# Factory functions
def create_order_created(
    order_id: str,
    version: int,
    payload: OrderCreatedPayload,
) -> OrderCreated:
    return DomainEvent.create(
        event_type="OrderCreated",
        aggregate_id=order_id,
        aggregate_type="Order",
        version=version,
        payload=payload,
    )

Event Store Interface

Python usa ABC (Abstract Base Class) para definir interfaces. El decorador @abstractmethod marca métodos que las subclases deben implementar obligatoriamente.

Las operaciones son async porque interactúan con I/O (base de datos), permitiendo que otras tareas se ejecuten mientras esperamos respuestas.

# src/orderflow/infrastructure/event_store/interface.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Any

@dataclass
class StoredEvent:
    global_position: int
    stream_id: str
    stream_position: int
    event_type: str
    data: dict[str, Any]
    metadata: dict[str, Any]
    created_at: datetime

@dataclass
class AppendResult:
    next_expected_version: int
    global_position: int
    events_appended: int

class ConcurrencyError(Exception):
    def __init__(self, stream_id: str, expected: int, actual: int):
        self.stream_id = stream_id
        self.expected_version = expected
        self.actual_version = actual
        super().__init__(
            f"Concurrency conflict on {stream_id}: "
            f"expected {expected}, actual {actual}"
        )

class EventStore(ABC):
    @abstractmethod
    async def append(
        self,
        stream_id: str,
        events: list,
        expected_version: int,
    ) -> AppendResult:
        pass

    @abstractmethod
    async def read_stream(
        self,
        stream_id: str,
        from_version: int = 0,
    ) -> list[StoredEvent]:
        pass

    @abstractmethod
    async def read_all(
        self,
        from_position: int = 0,
        limit: int = 1000,
    ) -> list[StoredEvent]:
        pass

    @abstractmethod
    async def get_stream_version(self, stream_id: str) -> int:
        pass

API con FastAPI

FastAPI es un framework web moderno que combina alto rendimiento (basado en Starlette) con generación automática de documentación OpenAPI. El lifespan context manager gestiona el ciclo de vida de la aplicación (startup/shutdown).

# src/orderflow/api/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    print("Starting up...")
    yield
    # Shutdown
    print("Shutting down...")

app = FastAPI(
    title="OrderFlow Event Sourcing",
    version="1.0.0",
    lifespan=lifespan,
)

@app.get("/health")
async def health():
    return {"status": "ok"}

Docker Compose

# docker-compose.yml
services:
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: orderflow
      POSTGRES_PASSWORD: orderflow
      POSTGRES_DB: orderflow
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: postgres://orderflow:orderflow@postgres:5432/orderflow
    depends_on:
      - postgres

volumes:
  postgres_data:

Ejecutar el Proyecto

# Desarrollo
uv run uvicorn src.orderflow.api.main:app --reload

# Tests
uv run pytest

# Type checking
uv run mypy src/

# Linting
uv run ruff check src/

Resumen

Glosario

uv (Package Manager)

Definición: Gestor de paquetes y entornos virtuales para Python, escrito en Rust, que reemplaza pip, pip-tools, y virtualenv.

Por qué es importante: Es 10-100x más rápido que pip, maneja resolución de dependencias de forma correcta, y simplifica la gestión de proyectos.

Ejemplo práctico: uv add fastapi instala FastAPI y todas sus dependencias, actualizando pyproject.toml y uv.lock automáticamente.


Dataclass con frozen=True

Definición: Clase de datos en Python donde las instancias son inmutables (no se pueden modificar después de crear).

Por qué es importante: Los eventos deben ser inmutables ya que representan hechos históricos. Intentar modificarlos lanza una excepción.

Ejemplo práctico: @dataclass(frozen=True) class Money crea objetos Money que no pueden cambiar; money.amount = 100 lanzaría FrozenInstanceError.


post_init

Definición: Método especial de dataclasses que se ejecuta automáticamente después del constructor generado.

Por qué es importante: Permite validar datos al crear el objeto sin escribir un __init__ manual.

Ejemplo práctico: En Money.__post_init__, validamos que amount >= 0 y currency tenga 3 caracteres, lanzando ValueError si no cumple.


TypeVar y Generic[T]

Definición: Mecanismos de Python para crear tipos genéricos que funcionan con diferentes tipos concretos manteniendo type safety.

Por qué es importante: Permite que DomainEvent[T] sea tipado correctamente: DomainEvent[OrderCreatedPayload] indica que el payload es de ese tipo específico.

Ejemplo práctico: El IDE puede autocompletar event.payload.customer_id porque sabe que el payload es OrderCreatedPayload, no un dict genérico.


ABC (Abstract Base Class)

Definición: Clase que no puede instanciarse directamente y define métodos que las subclases deben implementar.

Por qué es importante: Define contratos (interfaces) que diferentes implementaciones deben cumplir, facilitando sustitución y testing.

Ejemplo práctico: EventStore(ABC) define append() abstracto; PostgresEventStore y InMemoryEventStore deben implementarlo.


async/await

Definición: Sintaxis de Python para programación asíncrona, permitiendo que el código “espere” operaciones I/O sin bloquear el hilo.

Por qué es importante: Permite manejar miles de conexiones concurrentes con pocos recursos, ideal para APIs que consultan bases de datos.

Ejemplo práctico: await event_store.append(...) libera el hilo mientras PostgreSQL procesa; otras requests pueden atenderse mientras tanto.


FastAPI Lifespan

Definición: Context manager que gestiona el ciclo de vida de la aplicación FastAPI, ejecutando código al iniciar y al apagar.

Por qué es importante: Permite inicializar conexiones a bases de datos al arrancar y cerrarlas limpiamente al apagar.

Ejemplo práctico: El código antes del yield se ejecuta al startup (conectar a DB); el código después se ejecuta al shutdown (desconectar).


pyproject.toml

Definición: Archivo estándar (PEP 518/621) que define metadatos del proyecto Python, dependencias, y configuración de herramientas.

Por qué es importante: Centraliza toda la configuración del proyecto en un solo archivo, reemplazando setup.py, requirements.txt, y archivos de config separados.

Ejemplo práctico: Define nombre, versión, dependencias de producción y desarrollo, y configuración de ruff, mypy, y pytest en un solo lugar.


← Capítulo 13: Go Agregados | Capítulo 15: EventStoreDB con Python →