← Volver al listado de tecnologías

Capítulo 15: EventStoreDB con Python

Por: SiempreListo
event-sourcingpythoneventstoredbgrpc

Capítulo 15: EventStoreDB con Python

“EventStoreDB es la base de datos diseñada específicamente para Event Sourcing”

Qué es EventStoreDB

EventStoreDB es una base de datos open-source diseñada desde cero para Event Sourcing. A diferencia de usar PostgreSQL o MongoDB como Event Store (donde adaptamos una DB de propósito general), EventStoreDB ofrece:

El cliente Python oficial (esdbclient) se comunica con EventStoreDB vía gRPC, proporcionando operaciones tipadas y manejo de errores específico.

Setup de EventStoreDB

# Instalar cliente
uv add esdbclient
# docker-compose.yml
services:
  eventstore:
    image: eventstore/eventstore:24.2
    environment:
      # Modo single-node (sin clustering para desarrollo)
      EVENTSTORE_CLUSTER_SIZE: 1
      # Habilitar proyecciones (JavaScript dentro de EventStoreDB)
      EVENTSTORE_RUN_PROJECTIONS: All
      # Activar proyecciones de sistema ($by_category, etc.)
      EVENTSTORE_START_STANDARD_PROJECTIONS: true
      # Desactivar TLS para desarrollo local
      EVENTSTORE_INSECURE: true
    ports:
      # Puerto unificado para HTTP (Admin UI) y gRPC (cliente)
      - "2113:2113"
    volumes:
      - eventstore_data:/var/lib/eventstore

volumes:
  eventstore_data:

Implementación del Event Store

La implementación envuelve el cliente oficial de EventStoreDB (esdbclient) adaptándolo a nuestra interfaz EventStore. Los conceptos clave son:

# src/orderflow/infrastructure/event_store/eventstoredb.py
import json
from datetime import datetime
from typing import Any
from uuid import uuid4

from esdbclient import EventStoreDBClient, NewEvent, StreamState

from .interface import EventStore, StoredEvent, AppendResult, ConcurrencyError

class EventStoreDBStore(EventStore):
    def __init__(self, uri: str = "esdb://localhost:2113?tls=false"):
        self.client = EventStoreDBClient(uri=uri)

    async def append(
        self,
        stream_id: str,
        events: list,
        expected_version: int,
    ) -> AppendResult:
        if not events:
            return AppendResult(
                next_expected_version=expected_version,
                global_position=0,
                events_appended=0,
            )

        new_events = [
            NewEvent(
                type=event.event_type,
                data=self._serialize_payload(event.payload),
                metadata=self._serialize_metadata(event.metadata),
                id=uuid4(),
            )
            for event in events
        ]

        # Determinar estado esperado
        if expected_version == -1:
            current_version = StreamState.NO_STREAM
        else:
            current_version = expected_version

        try:
            result = self.client.append_to_stream(
                stream_name=stream_id,
                current_version=current_version,
                events=new_events,
            )

            return AppendResult(
                next_expected_version=result.commit_position,
                global_position=result.commit_position,
                events_appended=len(events),
            )
        except Exception as e:
            if "WrongExpectedVersion" in str(e):
                actual = await self.get_stream_version(stream_id)
                raise ConcurrencyError(stream_id, expected_version, actual)
            raise

    async def read_stream(
        self,
        stream_id: str,
        from_version: int = 0,
    ) -> list[StoredEvent]:
        try:
            events = list(self.client.get_stream(
                stream_name=stream_id,
                stream_position=from_version,
            ))
        except Exception:
            return []

        return [
            StoredEvent(
                global_position=event.commit_position or 0,
                stream_id=event.stream_name,
                stream_position=event.stream_position,
                event_type=event.type,
                data=json.loads(event.data.decode()) if event.data else {},
                metadata=json.loads(event.metadata.decode()) if event.metadata else {},
                created_at=datetime.utcnow(),  # EventStoreDB no expone timestamp fácilmente
            )
            for event in events
        ]

    async def read_all(
        self,
        from_position: int = 0,
        limit: int = 1000,
    ) -> list[StoredEvent]:
        events = list(self.client.read_all(
            commit_position=from_position,
            limit=limit,
        ))

        return [
            StoredEvent(
                global_position=event.commit_position or 0,
                stream_id=event.stream_name,
                stream_position=event.stream_position,
                event_type=event.type,
                data=json.loads(event.data.decode()) if event.data else {},
                metadata=json.loads(event.metadata.decode()) if event.metadata else {},
                created_at=datetime.utcnow(),
            )
            for event in events
            if not event.stream_name.startswith("$")  # Filtrar streams de sistema
        ]

    async def get_stream_version(self, stream_id: str) -> int:
        try:
            events = list(self.client.get_stream(
                stream_name=stream_id,
                backwards=True,
                limit=1,
            ))
            if events:
                return events[0].stream_position
            return -1
        except Exception:
            return -1

    def _serialize_payload(self, payload: Any) -> bytes:
        if hasattr(payload, "__dict__"):
            data = self._dataclass_to_dict(payload)
        else:
            data = payload
        return json.dumps(data).encode()

    def _serialize_metadata(self, metadata: Any) -> bytes:
        if hasattr(metadata, "__dict__"):
            data = self._dataclass_to_dict(metadata)
        else:
            data = metadata
        return json.dumps(data).encode()

    def _dataclass_to_dict(self, obj: Any) -> dict:
        if hasattr(obj, "__dataclass_fields__"):
            result = {}
            for field_name in obj.__dataclass_fields__:
                value = getattr(obj, field_name)
                result[field_name] = self._dataclass_to_dict(value)
            return result
        elif isinstance(obj, (list, tuple)):
            return [self._dataclass_to_dict(item) for item in obj]
        elif isinstance(obj, datetime):
            return obj.isoformat()
        return obj

    def close(self):
        self.client.close()

Subscripciones

Las subscripciones permiten recibir eventos en tiempo real sin hacer polling. EventStoreDB mantiene la conexión abierta y envía cada nuevo evento apenas se persiste.

Hay dos tipos principales:

El parámetro commit_position permite reanudar desde donde se dejó si la conexión se pierde.

# src/orderflow/infrastructure/event_store/subscription.py
import asyncio
from typing import Callable, Awaitable
from esdbclient import EventStoreDBClient, CatchupSubscription

from .interface import StoredEvent

EventHandler = Callable[[StoredEvent], Awaitable[None]]

class EventStoreSubscription:
    def __init__(self, client: EventStoreDBClient):
        self.client = client
        self._running = False

    async def subscribe_to_all(
        self,
        handler: EventHandler,
        from_position: int | None = None,
    ):
        self._running = True

        subscription = self.client.subscribe_to_all(
            commit_position=from_position,
        )

        try:
            for event in subscription:
                if not self._running:
                    break

                # Filtrar eventos de sistema
                if event.stream_name.startswith("$"):
                    continue

                stored = StoredEvent(
                    global_position=event.commit_position or 0,
                    stream_id=event.stream_name,
                    stream_position=event.stream_position,
                    event_type=event.type,
                    data=json.loads(event.data.decode()) if event.data else {},
                    metadata={},
                    created_at=datetime.utcnow(),
                )

                await handler(stored)

        finally:
            subscription.stop()

    async def subscribe_to_stream(
        self,
        stream_id: str,
        handler: EventHandler,
        from_version: int | None = None,
    ):
        self._running = True

        subscription = self.client.subscribe_to_stream(
            stream_name=stream_id,
            stream_position=from_version,
        )

        try:
            for event in subscription:
                if not self._running:
                    break

                stored = StoredEvent(
                    global_position=event.commit_position or 0,
                    stream_id=event.stream_name,
                    stream_position=event.stream_position,
                    event_type=event.type,
                    data=json.loads(event.data.decode()) if event.data else {},
                    metadata={},
                    created_at=datetime.utcnow(),
                )

                await handler(stored)

        finally:
            subscription.stop()

    def stop(self):
        self._running = False

Agregado Order en Python

El agregado Order sigue el mismo patrón que en Go y TypeScript: comandos generan eventos, eventos modifican estado. En Python usamos:

# src/orderflow/domain/aggregates/order.py
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
from uuid import uuid4

from ..events.base import DomainEvent, EventMetadata
from ..events.order_events import (
    OrderItem, OrderCreatedPayload, OrderItemAddedPayload,
    OrderConfirmedPayload, PaymentReceivedPayload,
    OrderShippedPayload, OrderCancelledPayload,
)
from ..value_objects.money import Money
from ..value_objects.address import Address

class OrderStatus(Enum):
    DRAFT = "draft"
    CONFIRMED = "confirmed"
    PAID = "paid"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

@dataclass
class ItemState:
    product_id: str
    product_name: str
    sku: str
    quantity: int
    unit_price: Money

class Order:
    def __init__(self):
        self._id: str = ""
        self._customer_id: str = ""
        self._customer_email: str = ""
        self._items: dict[str, ItemState] = {}
        self._shipping_address: Address | None = None
        self._status: OrderStatus = OrderStatus.DRAFT
        self._subtotal: Money = Money(0)
        self._tax: Money = Money(0)
        self._total: Money = Money(0)
        self._version: int = -1
        self._uncommitted: list[DomainEvent] = []

    # Properties
    @property
    def id(self) -> str:
        return self._id

    @property
    def status(self) -> OrderStatus:
        return self._status

    @property
    def version(self) -> int:
        return self._version

    @property
    def total(self) -> Money:
        return self._total

    @property
    def uncommitted_events(self) -> list[DomainEvent]:
        return self._uncommitted.copy()

    def clear_uncommitted_events(self):
        self._uncommitted.clear()

    # Factory methods
    @classmethod
    def create(
        cls,
        customer_id: str,
        customer_email: str,
        items: list[OrderItem],
        shipping_address: Address,
    ) -> "Order":
        if not items:
            raise ValueError("Order must have at least one item")

        order = cls()
        order_id = str(uuid4())

        subtotal = sum(
            item.unit_price.amount * item.quantity
            for item in items
        )
        tax = subtotal * 0.08
        total = subtotal + tax

        payload = OrderCreatedPayload(
            customer_id=customer_id,
            customer_email=customer_email,
            items=tuple(items),
            shipping_address=shipping_address,
            subtotal=Money(subtotal),
            tax=Money(tax),
            total=Money(total),
        )

        event = DomainEvent.create(
            event_type="OrderCreated",
            aggregate_id=order_id,
            aggregate_type="Order",
            version=0,
            payload=payload,
        )

        order._apply(event)
        return order

    @classmethod
    def from_events(cls, events: list[DomainEvent]) -> "Order":
        if not events:
            raise ValueError("Cannot rehydrate without events")

        order = cls()
        for event in events:
            order._when(event)
        return order

    # Commands
    def confirm(self) -> None:
        self._assert_status(OrderStatus.DRAFT)

        if not self._items:
            raise ValueError("Cannot confirm empty order")

        payload = OrderConfirmedPayload(
            confirmed_at=datetime.utcnow(),
            estimated_delivery=datetime.utcnow() + timedelta(days=5),
        )

        self._apply_new("OrderConfirmed", payload)

    def receive_payment(
        self,
        payment_id: str,
        amount: Money,
        method: str,
        transaction_id: str,
    ) -> None:
        self._assert_status(OrderStatus.CONFIRMED)

        if amount.amount < self._total.amount:
            raise ValueError("Payment less than total")

        payload = PaymentReceivedPayload(
            payment_id=payment_id,
            amount=amount,
            method=method,
            transaction_id=transaction_id,
            paid_at=datetime.utcnow(),
        )

        self._apply_new("PaymentReceived", payload)

    def ship(self, tracking_number: str, carrier: str) -> None:
        self._assert_status(OrderStatus.PAID)

        payload = OrderShippedPayload(
            tracking_number=tracking_number,
            carrier=carrier,
            shipped_at=datetime.utcnow(),
            estimated_delivery=datetime.utcnow() + timedelta(days=3),
        )

        self._apply_new("OrderShipped", payload)

    def cancel(self, reason: str, cancelled_by: str) -> None:
        if self._status in (OrderStatus.SHIPPED, OrderStatus.DELIVERED, OrderStatus.CANCELLED):
            raise ValueError(f"Cannot cancel in status {self._status}")

        payload = OrderCancelledPayload(
            reason=reason,
            cancelled_by=cancelled_by,
            cancelled_at=datetime.utcnow(),
            refund_required=self._status == OrderStatus.PAID,
        )

        self._apply_new("OrderCancelled", payload)

    # Event handlers
    def _when(self, event: DomainEvent) -> None:
        handlers = {
            "OrderCreated": self._on_order_created,
            "OrderItemAdded": self._on_order_item_added,
            "OrderConfirmed": lambda e: setattr(self, "_status", OrderStatus.CONFIRMED),
            "PaymentReceived": lambda e: setattr(self, "_status", OrderStatus.PAID),
            "OrderShipped": lambda e: setattr(self, "_status", OrderStatus.SHIPPED),
            "OrderDelivered": lambda e: setattr(self, "_status", OrderStatus.DELIVERED),
            "OrderCancelled": lambda e: setattr(self, "_status", OrderStatus.CANCELLED),
        }

        handler = handlers.get(event.event_type)
        if handler:
            handler(event)

        self._version = event.version

    def _on_order_created(self, event: DomainEvent) -> None:
        payload: OrderCreatedPayload = event.payload
        self._id = event.aggregate_id
        self._customer_id = payload.customer_id
        self._customer_email = payload.customer_email
        self._shipping_address = payload.shipping_address
        self._subtotal = payload.subtotal
        self._tax = payload.tax
        self._total = payload.total
        self._status = OrderStatus.DRAFT

        for item in payload.items:
            self._items[item.product_id] = ItemState(
                product_id=item.product_id,
                product_name=item.product_name,
                sku=item.sku,
                quantity=item.quantity,
                unit_price=item.unit_price,
            )

    def _on_order_item_added(self, event: DomainEvent) -> None:
        payload: OrderItemAddedPayload = event.payload
        item = payload.item
        self._items[item.product_id] = ItemState(
            product_id=item.product_id,
            product_name=item.product_name,
            sku=item.sku,
            quantity=item.quantity,
            unit_price=item.unit_price,
        )
        self._subtotal = payload.new_subtotal
        self._total = payload.new_total

    def _apply(self, event: DomainEvent) -> None:
        self._uncommitted.append(event)
        self._when(event)

    def _apply_new(self, event_type: str, payload: Any) -> None:
        event = DomainEvent.create(
            event_type=event_type,
            aggregate_id=self._id,
            aggregate_type="Order",
            version=self._version + 1,
            payload=payload,
        )
        self._apply(event)

    def _assert_status(self, expected: OrderStatus) -> None:
        if self._status != expected:
            raise ValueError(f"Invalid status: {self._status}")

Resumen

Glosario

EventStoreDB

Definición: Base de datos open-source diseñada específicamente para Event Sourcing, con streams, subscripciones y proyecciones como conceptos nativos.

Por qué es importante: Elimina la necesidad de implementar lógica de Event Store sobre bases de datos genéricas, reduciendo código y errores.

Ejemplo práctico: En lugar de crear tablas de eventos, índices y lógica de concurrencia en PostgreSQL, EventStoreDB lo proporciona out-of-the-box.


gRPC

Definición: Framework de llamadas a procedimientos remotos (RPC) desarrollado por Google que usa Protocol Buffers para serialización y HTTP/2 para transporte.

Por qué es importante: Es más eficiente que REST+JSON: conexiones persistentes, streaming bidireccional, y serialización binaria compacta.

Ejemplo práctico: El cliente esdbclient mantiene una conexión gRPC abierta con EventStoreDB, permitiendo subscripciones en tiempo real sin reconectar.


Stream State (NO_STREAM, ANY, STREAM_EXISTS)

Definición: Constantes que indican el estado esperado del stream al escribir eventos para control de concurrencia.

Por qué es importante: NO_STREAM falla si el stream ya existe (crear nuevo), STREAM_EXISTS falla si no existe, y números específicos verifican la versión exacta.

Ejemplo práctico: Al crear una orden, usamos NO_STREAM; si otro proceso ya la creó, recibimos WrongExpectedVersion y podemos reintentar o informar al usuario.


Commit Position

Definición: Número único global que identifica la posición de un evento en todo el cluster de EventStoreDB, independiente del stream.

Por qué es importante: Permite ordenar todos los eventos del sistema cronológicamente y reanudar subscripciones exactamente donde se dejaron.

Ejemplo práctico: Un worker de proyecciones guarda su último commit_position procesado; al reiniciar, continúa desde ese punto sin reprocesar eventos.


Catch-up Subscription

Definición: Subscripción que primero lee eventos históricos desde una posición y luego cambia a modo tiempo real cuando alcanza el final.

Por qué es importante: Permite reconstruir proyecciones desde cero y luego mantenerse actualizada sin perder eventos durante la transición.

Ejemplo práctico: Nueva instancia de servicio se suscribe desde posición 0, procesa millones de eventos históricos, y cuando alcanza el presente continúa en tiempo real.


Streams de Sistema ($)

Definición: Streams internos de EventStoreDB cuyos nombres empiezan con $, usados para metadata, proyecciones y configuración.

Por qué es importante: Deben filtrarse al procesar eventos de usuario para evitar procesar metadata interna como si fueran eventos de negocio.

Ejemplo práctico: $ce-Order es un stream de sistema que agrupa eventos por categoría; if stream_name.startswith("$"): continue los ignora en proyecciones.


Enum en Python

Definición: Tipo especial que define un conjunto finito de valores con nombre, proporcionando type safety y autocompletado.

Por qué es importante: Evita errores de typos en strings (como "comfirmed" vs "confirmed") y permite que el IDE sugiera valores válidos.

Ejemplo práctico: OrderStatus.CONFIRMED es un valor tipado; comparar con order.status == "confirmed" (string) puede fallar silenciosamente por typos.


Property en Python

Definición: Decorador que permite acceder a métodos como si fueran atributos, controlando lectura y escritura.

Por qué es importante: Expone estado del agregado de forma readonly sin permitir modificación directa que saltaría la lógica de eventos.

Ejemplo práctico: @property def status(self) -> OrderStatus: return self._status permite leer order.status pero order.status = X falla.


← Capítulo 14: Python Setup | Capítulo 16: Snapshots y Optimización →