Capítulo 16: Sagas en Python
Capítulo 16: Sagas en Python
“Python y asyncio: sagas elegantes y legibles”
Introduccion
Despues de explorar Go con Temporal, ahora implementaremos sagas en Python usando su biblioteca nativa de programacion asincrona: asyncio.
Python es conocido por su sintaxis clara y legible, lo que lo hace ideal para expresar flujos de negocio complejos de manera comprensible. Con asyncio, podemos ejecutar multiples operaciones de I/O concurrentemente sin necesidad de threads.
Esta implementacion es “manual” - construimos nuestra propia infraestructura de saga. Es util para entender los conceptos fundamentales y para sistemas donde Temporal u otras plataformas no son viables.
Arquitectura Base
La estructura del proyecto sigue el principio de separacion de responsabilidades:
saga/: Logica del patron saga (base, steps, orchestrator)services/: Servicios de negocio que ejecutan operaciones realesmain.py: Punto de entrada de la aplicacion
orderflow/
├── saga/
│ ├── __init__.py
│ ├── base.py
│ ├── steps.py
│ └── orchestrator.py
├── services/
│ ├── order.py
│ ├── inventory.py
│ └── payment.py
└── main.py
Definicion Base de Saga
Comenzamos definiendo las abstracciones base. En Python usamos:
- ABC (Abstract Base Class): Clase que define metodos abstractos que las subclases deben implementar
- dataclass: Decorador que genera automaticamente
__init__,__repr__y otros metodos para clases de datos - TypeVar y Generic: Permiten crear clases y funciones tipadas genericamente
- Enum: Define conjuntos de constantes con nombre
La clase SagaStep es generica sobre T, lo que significa que puede trabajar con cualquier tipo de contexto que definamos.
# saga/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Generic, TypeVar
from enum import Enum
import asyncio
class StepStatus(Enum):
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATED = "compensated"
@dataclass
class StepResult:
name: str
status: StepStatus
result: Any = None
error: str | None = None
T = TypeVar('T')
class SagaStep(ABC, Generic[T]):
def __init__(self, name: str):
self.name = name
@abstractmethod
async def execute(self, context: T) -> Any:
pass
@abstractmethod
async def compensate(self, context: T) -> None:
pass
@dataclass
class SagaContext:
order_id: str | None = None
customer_id: str = ""
items: list = field(default_factory=list)
total: float = 0.0
reservation_id: str | None = None
payment_id: str | None = None
shipment_id: str | None = None
Steps de la Saga
Cada step implementa la interfaz SagaStep con dos metodos:
execute(): Realiza la operacion (crear orden, reservar stock, etc.)compensate(): Deshace la operacion si es necesario
Los steps reciben los servicios que necesitan via inyeccion de dependencias en el constructor. Esto facilita el testing al permitir inyectar mocks.
El contexto (SagaContext) se va enriqueciendo a medida que cada step agrega informacion (IDs creados, resultados, etc.).
# saga/steps.py
from saga.base import SagaStep, SagaContext
from services.order import OrderService
from services.inventory import InventoryService
from services.payment import PaymentService
class CreateOrderStep(SagaStep[SagaContext]):
def __init__(self, order_service: OrderService):
super().__init__("create_order")
self.order_service = order_service
async def execute(self, context: SagaContext) -> str:
order = await self.order_service.create(
customer_id=context.customer_id,
items=context.items,
total=context.total
)
context.order_id = order.id
return order.id
async def compensate(self, context: SagaContext) -> None:
if context.order_id:
await self.order_service.cancel(context.order_id)
class ReserveStockStep(SagaStep[SagaContext]):
def __init__(self, inventory_service: InventoryService):
super().__init__("reserve_stock")
self.inventory_service = inventory_service
async def execute(self, context: SagaContext) -> str:
reservation = await self.inventory_service.reserve(
order_id=context.order_id,
items=context.items
)
context.reservation_id = reservation.id
return reservation.id
async def compensate(self, context: SagaContext) -> None:
if context.reservation_id:
await self.inventory_service.release(context.reservation_id)
class ProcessPaymentStep(SagaStep[SagaContext]):
def __init__(self, payment_service: PaymentService):
super().__init__("process_payment")
self.payment_service = payment_service
async def execute(self, context: SagaContext) -> str:
payment = await self.payment_service.process(
order_id=context.order_id,
amount=context.total
)
context.payment_id = payment.id
return payment.id
async def compensate(self, context: SagaContext) -> None:
if context.payment_id:
await self.payment_service.refund(context.payment_id)
Orquestador de Saga
El SagaOrchestrator es el corazon de la implementacion. Coordina la ejecucion de steps y maneja el flujo de compensacion cuando algo falla.
Responsabilidades clave:
- Ejecutar steps en secuencia
- Rastrear que steps se completaron
- Ejecutar compensaciones en orden inverso si algo falla
- Mantener el estado general de la saga
El metodo _compensate() usa reversed() para iterar los steps completados en orden inverso, garantizando que las compensaciones se ejecuten LIFO.
# saga/orchestrator.py
from dataclasses import dataclass, field
from typing import Generic, TypeVar
from enum import Enum
import logging
from saga.base import SagaStep, StepResult, StepStatus
logger = logging.getLogger(__name__)
class SagaStatus(Enum):
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
T = TypeVar('T')
@dataclass
class SagaState(Generic[T]):
saga_id: str
status: SagaStatus = SagaStatus.RUNNING
context: T = None
completed_steps: list[StepResult] = field(default_factory=list)
error: str | None = None
class SagaOrchestrator(Generic[T]):
def __init__(self, saga_id: str, steps: list[SagaStep[T]]):
self.saga_id = saga_id
self.steps = steps
self.state = SagaState[T](saga_id=saga_id)
async def execute(self, context: T) -> SagaState[T]:
self.state.context = context
self.state.status = SagaStatus.RUNNING
try:
for step in self.steps:
logger.info(f"Executing step: {step.name}")
result = await step.execute(context)
self.state.completed_steps.append(StepResult(
name=step.name,
status=StepStatus.COMPLETED,
result=result
))
self.state.status = SagaStatus.COMPLETED
logger.info(f"Saga {self.saga_id} completed successfully")
except Exception as e:
logger.error(f"Saga {self.saga_id} failed: {e}")
self.state.error = str(e)
self.state.status = SagaStatus.COMPENSATING
await self._compensate()
self.state.status = SagaStatus.FAILED
return self.state
async def _compensate(self) -> None:
logger.info(f"Starting compensation for saga {self.saga_id}")
for step_result in reversed(self.state.completed_steps):
step = next(s for s in self.steps if s.name == step_result.name)
try:
logger.info(f"Compensating step: {step.name}")
await step.compensate(self.state.context)
step_result.status = StepStatus.COMPENSATED
except Exception as e:
logger.error(f"Compensation failed for {step.name}: {e}")
step_result.status = StepStatus.FAILED
step_result.error = str(e)
Servicios
Los servicios encapsulan la logica de negocio y el acceso a datos. Son independientes de la saga - no saben que son parte de una transaccion distribuida.
Esta separacion es importante porque:
- Los servicios pueden probarse independientemente
- La misma logica puede usarse fuera de sagas
- Facilita la migracion a microservicios reales
En produccion, estos servicios se comunicarian con bases de datos y APIs externas en lugar de almacenar datos en memoria.
# services/order.py
from dataclasses import dataclass
from uuid import uuid4
@dataclass
class Order:
id: str
customer_id: str
items: list
total: float
status: str
class OrderService:
def __init__(self):
self.orders: dict[str, Order] = {}
async def create(self, customer_id: str, items: list, total: float) -> Order:
order = Order(
id=str(uuid4()),
customer_id=customer_id,
items=items,
total=total,
status="pending"
)
self.orders[order.id] = order
return order
async def cancel(self, order_id: str) -> None:
if order_id in self.orders:
self.orders[order_id].status = "cancelled"
async def complete(self, order_id: str) -> None:
if order_id in self.orders:
self.orders[order_id].status = "completed"
Uso
El punto de entrada muestra como ensamblar todas las piezas:
- Instanciar servicios (en produccion, estos tendrian conexiones a DB)
- Crear los steps pasando los servicios necesarios
- Preparar el contexto inicial con datos del usuario
- Crear el orquestador con un ID unico y los steps
- Ejecutar la saga y manejar el resultado
asyncio.run() es el punto de entrada para codigo asincrono en Python, creando el event loop y ejecutando la corutina principal.
# main.py
import asyncio
from uuid import uuid4
from saga.base import SagaContext
from saga.orchestrator import SagaOrchestrator
from saga.steps import CreateOrderStep, ReserveStockStep, ProcessPaymentStep
from services.order import OrderService
from services.inventory import InventoryService
from services.payment import PaymentService
async def main():
# Instanciar servicios
order_service = OrderService()
inventory_service = InventoryService()
payment_service = PaymentService()
# Definir steps
steps = [
CreateOrderStep(order_service),
ReserveStockStep(inventory_service),
ProcessPaymentStep(payment_service),
]
# Crear contexto
context = SagaContext(
customer_id="cust-123",
items=[{"product_id": "prod-1", "quantity": 2}],
total=99.99
)
# Ejecutar saga
orchestrator = SagaOrchestrator(
saga_id=str(uuid4()),
steps=steps
)
result = await orchestrator.execute(context)
print(f"Saga status: {result.status}")
print(f"Order ID: {result.context.order_id}")
if __name__ == "__main__":
asyncio.run(main())
Resumen
- SagaStep define execute y compensate
- SagaOrchestrator coordina los pasos
- asyncio permite operaciones concurrentes
- Las compensaciones se ejecutan en orden inverso
Glosario
asyncio
Definicion: Biblioteca estandar de Python para escribir codigo asincrono usando la sintaxis async/await, permitiendo concurrencia sin threads.
Por que es importante: Permite que una saga ejecute multiples operaciones de I/O (llamadas HTTP, consultas DB) sin bloquear, mejorando el rendimiento cuando hay esperas de red.
Ejemplo practico: Mientras esperamos respuesta del servicio de pagos, podemos iniciar la llamada al servicio de notificaciones, reduciendo el tiempo total de la saga.
Abstract Base Class (ABC)
Definicion: Clase en Python que define metodos abstractos que todas las subclases deben implementar, garantizando una interfaz consistente.
Por que es importante: Asegura que todos los steps de saga tengan los metodos execute y compensate, evitando errores en tiempo de ejecucion.
Ejemplo practico: Si creamos un ShippingStep que olvida implementar compensate(), Python lanzara un error al instanciarlo, no cuando intentemos compensar.
dataclass
Definicion: Decorador de Python 3.7+ que genera automaticamente metodos comunes (__init__, __repr__, etc.) para clases que principalmente almacenan datos.
Por que es importante: Reduce el codigo repetitivo y hace las clases de datos mas concisas y legibles, facilitando la definicion de estados y resultados de saga.
Ejemplo practico: @dataclass class Order: id: str; total: float genera automaticamente el constructor Order(id="123", total=99.99).
Generic (TypeVar)
Definicion: Mecanismo de Python para crear clases y funciones que trabajan con tipos parametrizados, permitiendo reutilizacion de codigo con type safety.
Por que es importante: Permite que SagaStep[T] funcione con cualquier tipo de contexto, manteniendo la informacion de tipos para herramientas de analisis estatico.
Ejemplo practico: SagaStep[OrderContext] indica que este step trabaja especificamente con contextos de orden, y el editor puede autocompletar propiedades.
Inyeccion de Dependencias
Definicion: Patron de diseno donde una clase recibe sus dependencias (otros objetos que necesita) a traves del constructor en lugar de crearlas internamente.
Por que es importante: Facilita el testing al permitir inyectar mocks, y desacopla los componentes haciendo el codigo mas modular y mantenible.
Ejemplo practico: CreateOrderStep(order_service) recibe el servicio como parametro. En tests, pasamos un mock; en produccion, el servicio real.
Contexto de Saga
Definicion: Objeto mutable que viaja a traves de todos los steps de una saga, acumulando informacion necesaria para pasos posteriores y compensaciones.
Por que es importante: Permite que cada step acceda a resultados de steps anteriores (como IDs creados) y proporciona toda la informacion necesaria para compensar.
Ejemplo practico: El step de pago necesita el order_id creado por el step anterior. Este ID se almacena en el contexto y esta disponible para todos los steps siguientes.
← Capítulo 15: Go Workflows | Capítulo 17: Celery y Saga Pattern →