Capítulo 15: EventStoreDB con Python
Capítulo 15: EventStoreDB con Python
“EventStoreDB es la base de datos diseñada específicamente para Event Sourcing”
Qué es EventStoreDB
EventStoreDB es una base de datos open-source diseñada desde cero para Event Sourcing. A diferencia de usar PostgreSQL o MongoDB como Event Store (donde adaptamos una DB de propósito general), EventStoreDB ofrece:
- Streams nativos: El concepto de stream es ciudadano de primera clase
- Subscripciones en tiempo real: Push de eventos sin polling
- Proyecciones integradas: JavaScript para crear read models dentro de la DB
- Optimistic concurrency: Control de versiones incorporado
- gRPC: Protocolo de comunicación eficiente y tipado
El cliente Python oficial (esdbclient) se comunica con EventStoreDB vía gRPC, proporcionando operaciones tipadas y manejo de errores específico.
Setup de EventStoreDB
# Instalar cliente
uv add esdbclient
# docker-compose.yml
services:
eventstore:
image: eventstore/eventstore:24.2
environment:
# Modo single-node (sin clustering para desarrollo)
EVENTSTORE_CLUSTER_SIZE: 1
# Habilitar proyecciones (JavaScript dentro de EventStoreDB)
EVENTSTORE_RUN_PROJECTIONS: All
# Activar proyecciones de sistema ($by_category, etc.)
EVENTSTORE_START_STANDARD_PROJECTIONS: true
# Desactivar TLS para desarrollo local
EVENTSTORE_INSECURE: true
ports:
# Puerto unificado para HTTP (Admin UI) y gRPC (cliente)
- "2113:2113"
volumes:
- eventstore_data:/var/lib/eventstore
volumes:
eventstore_data:
Implementación del Event Store
La implementación envuelve el cliente oficial de EventStoreDB (esdbclient) adaptándolo a nuestra interfaz EventStore. Los conceptos clave son:
- StreamState.NO_STREAM: Indica que esperamos crear un stream nuevo (versión -1)
- NewEvent: Wrapper para enviar eventos con tipo, datos y metadata
- commit_position: Posición global única del evento en todo el cluster
# src/orderflow/infrastructure/event_store/eventstoredb.py
import json
from datetime import datetime
from typing import Any
from uuid import uuid4
from esdbclient import EventStoreDBClient, NewEvent, StreamState
from .interface import EventStore, StoredEvent, AppendResult, ConcurrencyError
class EventStoreDBStore(EventStore):
def __init__(self, uri: str = "esdb://localhost:2113?tls=false"):
self.client = EventStoreDBClient(uri=uri)
async def append(
self,
stream_id: str,
events: list,
expected_version: int,
) -> AppendResult:
if not events:
return AppendResult(
next_expected_version=expected_version,
global_position=0,
events_appended=0,
)
new_events = [
NewEvent(
type=event.event_type,
data=self._serialize_payload(event.payload),
metadata=self._serialize_metadata(event.metadata),
id=uuid4(),
)
for event in events
]
# Determinar estado esperado
if expected_version == -1:
current_version = StreamState.NO_STREAM
else:
current_version = expected_version
try:
result = self.client.append_to_stream(
stream_name=stream_id,
current_version=current_version,
events=new_events,
)
return AppendResult(
next_expected_version=result.commit_position,
global_position=result.commit_position,
events_appended=len(events),
)
except Exception as e:
if "WrongExpectedVersion" in str(e):
actual = await self.get_stream_version(stream_id)
raise ConcurrencyError(stream_id, expected_version, actual)
raise
async def read_stream(
self,
stream_id: str,
from_version: int = 0,
) -> list[StoredEvent]:
try:
events = list(self.client.get_stream(
stream_name=stream_id,
stream_position=from_version,
))
except Exception:
return []
return [
StoredEvent(
global_position=event.commit_position or 0,
stream_id=event.stream_name,
stream_position=event.stream_position,
event_type=event.type,
data=json.loads(event.data.decode()) if event.data else {},
metadata=json.loads(event.metadata.decode()) if event.metadata else {},
created_at=datetime.utcnow(), # EventStoreDB no expone timestamp fácilmente
)
for event in events
]
async def read_all(
self,
from_position: int = 0,
limit: int = 1000,
) -> list[StoredEvent]:
events = list(self.client.read_all(
commit_position=from_position,
limit=limit,
))
return [
StoredEvent(
global_position=event.commit_position or 0,
stream_id=event.stream_name,
stream_position=event.stream_position,
event_type=event.type,
data=json.loads(event.data.decode()) if event.data else {},
metadata=json.loads(event.metadata.decode()) if event.metadata else {},
created_at=datetime.utcnow(),
)
for event in events
if not event.stream_name.startswith("$") # Filtrar streams de sistema
]
async def get_stream_version(self, stream_id: str) -> int:
try:
events = list(self.client.get_stream(
stream_name=stream_id,
backwards=True,
limit=1,
))
if events:
return events[0].stream_position
return -1
except Exception:
return -1
def _serialize_payload(self, payload: Any) -> bytes:
if hasattr(payload, "__dict__"):
data = self._dataclass_to_dict(payload)
else:
data = payload
return json.dumps(data).encode()
def _serialize_metadata(self, metadata: Any) -> bytes:
if hasattr(metadata, "__dict__"):
data = self._dataclass_to_dict(metadata)
else:
data = metadata
return json.dumps(data).encode()
def _dataclass_to_dict(self, obj: Any) -> dict:
if hasattr(obj, "__dataclass_fields__"):
result = {}
for field_name in obj.__dataclass_fields__:
value = getattr(obj, field_name)
result[field_name] = self._dataclass_to_dict(value)
return result
elif isinstance(obj, (list, tuple)):
return [self._dataclass_to_dict(item) for item in obj]
elif isinstance(obj, datetime):
return obj.isoformat()
return obj
def close(self):
self.client.close()
Subscripciones
Las subscripciones permiten recibir eventos en tiempo real sin hacer polling. EventStoreDB mantiene la conexión abierta y envía cada nuevo evento apenas se persiste.
Hay dos tipos principales:
- subscribe_to_all: Recibe todos los eventos del sistema (para proyecciones globales)
- subscribe_to_stream: Recibe solo eventos de un stream específico
El parámetro commit_position permite reanudar desde donde se dejó si la conexión se pierde.
# src/orderflow/infrastructure/event_store/subscription.py
import asyncio
from typing import Callable, Awaitable
from esdbclient import EventStoreDBClient, CatchupSubscription
from .interface import StoredEvent
EventHandler = Callable[[StoredEvent], Awaitable[None]]
class EventStoreSubscription:
def __init__(self, client: EventStoreDBClient):
self.client = client
self._running = False
async def subscribe_to_all(
self,
handler: EventHandler,
from_position: int | None = None,
):
self._running = True
subscription = self.client.subscribe_to_all(
commit_position=from_position,
)
try:
for event in subscription:
if not self._running:
break
# Filtrar eventos de sistema
if event.stream_name.startswith("$"):
continue
stored = StoredEvent(
global_position=event.commit_position or 0,
stream_id=event.stream_name,
stream_position=event.stream_position,
event_type=event.type,
data=json.loads(event.data.decode()) if event.data else {},
metadata={},
created_at=datetime.utcnow(),
)
await handler(stored)
finally:
subscription.stop()
async def subscribe_to_stream(
self,
stream_id: str,
handler: EventHandler,
from_version: int | None = None,
):
self._running = True
subscription = self.client.subscribe_to_stream(
stream_name=stream_id,
stream_position=from_version,
)
try:
for event in subscription:
if not self._running:
break
stored = StoredEvent(
global_position=event.commit_position or 0,
stream_id=event.stream_name,
stream_position=event.stream_position,
event_type=event.type,
data=json.loads(event.data.decode()) if event.data else {},
metadata={},
created_at=datetime.utcnow(),
)
await handler(stored)
finally:
subscription.stop()
def stop(self):
self._running = False
Agregado Order en Python
El agregado Order sigue el mismo patrón que en Go y TypeScript: comandos generan eventos, eventos modifican estado. En Python usamos:
- Enum para estados tipados con autocompletado
- Properties para exponer estado sin permitir modificación directa
- Diccionario de handlers para despacho dinámico de eventos
# src/orderflow/domain/aggregates/order.py
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Any
from uuid import uuid4
from ..events.base import DomainEvent, EventMetadata
from ..events.order_events import (
OrderItem, OrderCreatedPayload, OrderItemAddedPayload,
OrderConfirmedPayload, PaymentReceivedPayload,
OrderShippedPayload, OrderCancelledPayload,
)
from ..value_objects.money import Money
from ..value_objects.address import Address
class OrderStatus(Enum):
DRAFT = "draft"
CONFIRMED = "confirmed"
PAID = "paid"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
@dataclass
class ItemState:
product_id: str
product_name: str
sku: str
quantity: int
unit_price: Money
class Order:
def __init__(self):
self._id: str = ""
self._customer_id: str = ""
self._customer_email: str = ""
self._items: dict[str, ItemState] = {}
self._shipping_address: Address | None = None
self._status: OrderStatus = OrderStatus.DRAFT
self._subtotal: Money = Money(0)
self._tax: Money = Money(0)
self._total: Money = Money(0)
self._version: int = -1
self._uncommitted: list[DomainEvent] = []
# Properties
@property
def id(self) -> str:
return self._id
@property
def status(self) -> OrderStatus:
return self._status
@property
def version(self) -> int:
return self._version
@property
def total(self) -> Money:
return self._total
@property
def uncommitted_events(self) -> list[DomainEvent]:
return self._uncommitted.copy()
def clear_uncommitted_events(self):
self._uncommitted.clear()
# Factory methods
@classmethod
def create(
cls,
customer_id: str,
customer_email: str,
items: list[OrderItem],
shipping_address: Address,
) -> "Order":
if not items:
raise ValueError("Order must have at least one item")
order = cls()
order_id = str(uuid4())
subtotal = sum(
item.unit_price.amount * item.quantity
for item in items
)
tax = subtotal * 0.08
total = subtotal + tax
payload = OrderCreatedPayload(
customer_id=customer_id,
customer_email=customer_email,
items=tuple(items),
shipping_address=shipping_address,
subtotal=Money(subtotal),
tax=Money(tax),
total=Money(total),
)
event = DomainEvent.create(
event_type="OrderCreated",
aggregate_id=order_id,
aggregate_type="Order",
version=0,
payload=payload,
)
order._apply(event)
return order
@classmethod
def from_events(cls, events: list[DomainEvent]) -> "Order":
if not events:
raise ValueError("Cannot rehydrate without events")
order = cls()
for event in events:
order._when(event)
return order
# Commands
def confirm(self) -> None:
self._assert_status(OrderStatus.DRAFT)
if not self._items:
raise ValueError("Cannot confirm empty order")
payload = OrderConfirmedPayload(
confirmed_at=datetime.utcnow(),
estimated_delivery=datetime.utcnow() + timedelta(days=5),
)
self._apply_new("OrderConfirmed", payload)
def receive_payment(
self,
payment_id: str,
amount: Money,
method: str,
transaction_id: str,
) -> None:
self._assert_status(OrderStatus.CONFIRMED)
if amount.amount < self._total.amount:
raise ValueError("Payment less than total")
payload = PaymentReceivedPayload(
payment_id=payment_id,
amount=amount,
method=method,
transaction_id=transaction_id,
paid_at=datetime.utcnow(),
)
self._apply_new("PaymentReceived", payload)
def ship(self, tracking_number: str, carrier: str) -> None:
self._assert_status(OrderStatus.PAID)
payload = OrderShippedPayload(
tracking_number=tracking_number,
carrier=carrier,
shipped_at=datetime.utcnow(),
estimated_delivery=datetime.utcnow() + timedelta(days=3),
)
self._apply_new("OrderShipped", payload)
def cancel(self, reason: str, cancelled_by: str) -> None:
if self._status in (OrderStatus.SHIPPED, OrderStatus.DELIVERED, OrderStatus.CANCELLED):
raise ValueError(f"Cannot cancel in status {self._status}")
payload = OrderCancelledPayload(
reason=reason,
cancelled_by=cancelled_by,
cancelled_at=datetime.utcnow(),
refund_required=self._status == OrderStatus.PAID,
)
self._apply_new("OrderCancelled", payload)
# Event handlers
def _when(self, event: DomainEvent) -> None:
handlers = {
"OrderCreated": self._on_order_created,
"OrderItemAdded": self._on_order_item_added,
"OrderConfirmed": lambda e: setattr(self, "_status", OrderStatus.CONFIRMED),
"PaymentReceived": lambda e: setattr(self, "_status", OrderStatus.PAID),
"OrderShipped": lambda e: setattr(self, "_status", OrderStatus.SHIPPED),
"OrderDelivered": lambda e: setattr(self, "_status", OrderStatus.DELIVERED),
"OrderCancelled": lambda e: setattr(self, "_status", OrderStatus.CANCELLED),
}
handler = handlers.get(event.event_type)
if handler:
handler(event)
self._version = event.version
def _on_order_created(self, event: DomainEvent) -> None:
payload: OrderCreatedPayload = event.payload
self._id = event.aggregate_id
self._customer_id = payload.customer_id
self._customer_email = payload.customer_email
self._shipping_address = payload.shipping_address
self._subtotal = payload.subtotal
self._tax = payload.tax
self._total = payload.total
self._status = OrderStatus.DRAFT
for item in payload.items:
self._items[item.product_id] = ItemState(
product_id=item.product_id,
product_name=item.product_name,
sku=item.sku,
quantity=item.quantity,
unit_price=item.unit_price,
)
def _on_order_item_added(self, event: DomainEvent) -> None:
payload: OrderItemAddedPayload = event.payload
item = payload.item
self._items[item.product_id] = ItemState(
product_id=item.product_id,
product_name=item.product_name,
sku=item.sku,
quantity=item.quantity,
unit_price=item.unit_price,
)
self._subtotal = payload.new_subtotal
self._total = payload.new_total
def _apply(self, event: DomainEvent) -> None:
self._uncommitted.append(event)
self._when(event)
def _apply_new(self, event_type: str, payload: Any) -> None:
event = DomainEvent.create(
event_type=event_type,
aggregate_id=self._id,
aggregate_type="Order",
version=self._version + 1,
payload=payload,
)
self._apply(event)
def _assert_status(self, expected: OrderStatus) -> None:
if self._status != expected:
raise ValueError(f"Invalid status: {self._status}")
Resumen
- EventStoreDB es la solución nativa para Event Sourcing
- El cliente Python usa gRPC para comunicación
- Las subscripciones permiten reactividad en tiempo real
- Los agregados siguen el mismo patrón que en otros lenguajes
- La serialización convierte dataclasses a JSON
Glosario
EventStoreDB
Definición: Base de datos open-source diseñada específicamente para Event Sourcing, con streams, subscripciones y proyecciones como conceptos nativos.
Por qué es importante: Elimina la necesidad de implementar lógica de Event Store sobre bases de datos genéricas, reduciendo código y errores.
Ejemplo práctico: En lugar de crear tablas de eventos, índices y lógica de concurrencia en PostgreSQL, EventStoreDB lo proporciona out-of-the-box.
gRPC
Definición: Framework de llamadas a procedimientos remotos (RPC) desarrollado por Google que usa Protocol Buffers para serialización y HTTP/2 para transporte.
Por qué es importante: Es más eficiente que REST+JSON: conexiones persistentes, streaming bidireccional, y serialización binaria compacta.
Ejemplo práctico: El cliente esdbclient mantiene una conexión gRPC abierta con EventStoreDB, permitiendo subscripciones en tiempo real sin reconectar.
Stream State (NO_STREAM, ANY, STREAM_EXISTS)
Definición: Constantes que indican el estado esperado del stream al escribir eventos para control de concurrencia.
Por qué es importante: NO_STREAM falla si el stream ya existe (crear nuevo), STREAM_EXISTS falla si no existe, y números específicos verifican la versión exacta.
Ejemplo práctico: Al crear una orden, usamos NO_STREAM; si otro proceso ya la creó, recibimos WrongExpectedVersion y podemos reintentar o informar al usuario.
Commit Position
Definición: Número único global que identifica la posición de un evento en todo el cluster de EventStoreDB, independiente del stream.
Por qué es importante: Permite ordenar todos los eventos del sistema cronológicamente y reanudar subscripciones exactamente donde se dejaron.
Ejemplo práctico: Un worker de proyecciones guarda su último commit_position procesado; al reiniciar, continúa desde ese punto sin reprocesar eventos.
Catch-up Subscription
Definición: Subscripción que primero lee eventos históricos desde una posición y luego cambia a modo tiempo real cuando alcanza el final.
Por qué es importante: Permite reconstruir proyecciones desde cero y luego mantenerse actualizada sin perder eventos durante la transición.
Ejemplo práctico: Nueva instancia de servicio se suscribe desde posición 0, procesa millones de eventos históricos, y cuando alcanza el presente continúa en tiempo real.
Streams de Sistema ($)
Definición: Streams internos de EventStoreDB cuyos nombres empiezan con $, usados para metadata, proyecciones y configuración.
Por qué es importante: Deben filtrarse al procesar eventos de usuario para evitar procesar metadata interna como si fueran eventos de negocio.
Ejemplo práctico: $ce-Order es un stream de sistema que agrupa eventos por categoría; if stream_name.startswith("$"): continue los ignora en proyecciones.
Enum en Python
Definición: Tipo especial que define un conjunto finito de valores con nombre, proporcionando type safety y autocompletado.
Por qué es importante: Evita errores de typos en strings (como "comfirmed" vs "confirmed") y permite que el IDE sugiera valores válidos.
Ejemplo práctico: OrderStatus.CONFIRMED es un valor tipado; comparar con order.status == "confirmed" (string) puede fallar silenciosamente por typos.
Property en Python
Definición: Decorador que permite acceder a métodos como si fueran atributos, controlando lectura y escritura.
Por qué es importante: Expone estado del agregado de forma readonly sin permitir modificación directa que saltaría la lógica de eventos.
Ejemplo práctico: @property def status(self) -> OrderStatus: return self._status permite leer order.status pero order.status = X falla.
← Capítulo 14: Python Setup | Capítulo 16: Snapshots y Optimización →