← Volver al listado de tecnologías

Capítulo 16: Sagas en Python

Por: SiempreListo
sagapythonasynciomicroservicios

Capítulo 16: Sagas en Python

“Python y asyncio: sagas elegantes y legibles”

Introduccion

Despues de explorar Go con Temporal, ahora implementaremos sagas en Python usando su biblioteca nativa de programacion asincrona: asyncio.

Python es conocido por su sintaxis clara y legible, lo que lo hace ideal para expresar flujos de negocio complejos de manera comprensible. Con asyncio, podemos ejecutar multiples operaciones de I/O concurrentemente sin necesidad de threads.

Esta implementacion es “manual” - construimos nuestra propia infraestructura de saga. Es util para entender los conceptos fundamentales y para sistemas donde Temporal u otras plataformas no son viables.

Arquitectura Base

La estructura del proyecto sigue el principio de separacion de responsabilidades:

orderflow/
├── saga/
│   ├── __init__.py
│   ├── base.py
│   ├── steps.py
│   └── orchestrator.py
├── services/
│   ├── order.py
│   ├── inventory.py
│   └── payment.py
└── main.py

Definicion Base de Saga

Comenzamos definiendo las abstracciones base. En Python usamos:

La clase SagaStep es generica sobre T, lo que significa que puede trabajar con cualquier tipo de contexto que definamos.

# saga/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Generic, TypeVar
from enum import Enum
import asyncio

class StepStatus(Enum):
    PENDING = "pending"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATED = "compensated"

@dataclass
class StepResult:
    name: str
    status: StepStatus
    result: Any = None
    error: str | None = None

T = TypeVar('T')

class SagaStep(ABC, Generic[T]):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    async def execute(self, context: T) -> Any:
        pass

    @abstractmethod
    async def compensate(self, context: T) -> None:
        pass

@dataclass
class SagaContext:
    order_id: str | None = None
    customer_id: str = ""
    items: list = field(default_factory=list)
    total: float = 0.0
    reservation_id: str | None = None
    payment_id: str | None = None
    shipment_id: str | None = None

Steps de la Saga

Cada step implementa la interfaz SagaStep con dos metodos:

  1. execute(): Realiza la operacion (crear orden, reservar stock, etc.)
  2. compensate(): Deshace la operacion si es necesario

Los steps reciben los servicios que necesitan via inyeccion de dependencias en el constructor. Esto facilita el testing al permitir inyectar mocks.

El contexto (SagaContext) se va enriqueciendo a medida que cada step agrega informacion (IDs creados, resultados, etc.).

# saga/steps.py
from saga.base import SagaStep, SagaContext
from services.order import OrderService
from services.inventory import InventoryService
from services.payment import PaymentService

class CreateOrderStep(SagaStep[SagaContext]):
    def __init__(self, order_service: OrderService):
        super().__init__("create_order")
        self.order_service = order_service

    async def execute(self, context: SagaContext) -> str:
        order = await self.order_service.create(
            customer_id=context.customer_id,
            items=context.items,
            total=context.total
        )
        context.order_id = order.id
        return order.id

    async def compensate(self, context: SagaContext) -> None:
        if context.order_id:
            await self.order_service.cancel(context.order_id)

class ReserveStockStep(SagaStep[SagaContext]):
    def __init__(self, inventory_service: InventoryService):
        super().__init__("reserve_stock")
        self.inventory_service = inventory_service

    async def execute(self, context: SagaContext) -> str:
        reservation = await self.inventory_service.reserve(
            order_id=context.order_id,
            items=context.items
        )
        context.reservation_id = reservation.id
        return reservation.id

    async def compensate(self, context: SagaContext) -> None:
        if context.reservation_id:
            await self.inventory_service.release(context.reservation_id)

class ProcessPaymentStep(SagaStep[SagaContext]):
    def __init__(self, payment_service: PaymentService):
        super().__init__("process_payment")
        self.payment_service = payment_service

    async def execute(self, context: SagaContext) -> str:
        payment = await self.payment_service.process(
            order_id=context.order_id,
            amount=context.total
        )
        context.payment_id = payment.id
        return payment.id

    async def compensate(self, context: SagaContext) -> None:
        if context.payment_id:
            await self.payment_service.refund(context.payment_id)

Orquestador de Saga

El SagaOrchestrator es el corazon de la implementacion. Coordina la ejecucion de steps y maneja el flujo de compensacion cuando algo falla.

Responsabilidades clave:

  1. Ejecutar steps en secuencia
  2. Rastrear que steps se completaron
  3. Ejecutar compensaciones en orden inverso si algo falla
  4. Mantener el estado general de la saga

El metodo _compensate() usa reversed() para iterar los steps completados en orden inverso, garantizando que las compensaciones se ejecuten LIFO.

# saga/orchestrator.py
from dataclasses import dataclass, field
from typing import Generic, TypeVar
from enum import Enum
import logging

from saga.base import SagaStep, StepResult, StepStatus

logger = logging.getLogger(__name__)

class SagaStatus(Enum):
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"

T = TypeVar('T')

@dataclass
class SagaState(Generic[T]):
    saga_id: str
    status: SagaStatus = SagaStatus.RUNNING
    context: T = None
    completed_steps: list[StepResult] = field(default_factory=list)
    error: str | None = None

class SagaOrchestrator(Generic[T]):
    def __init__(self, saga_id: str, steps: list[SagaStep[T]]):
        self.saga_id = saga_id
        self.steps = steps
        self.state = SagaState[T](saga_id=saga_id)

    async def execute(self, context: T) -> SagaState[T]:
        self.state.context = context
        self.state.status = SagaStatus.RUNNING

        try:
            for step in self.steps:
                logger.info(f"Executing step: {step.name}")
                result = await step.execute(context)

                self.state.completed_steps.append(StepResult(
                    name=step.name,
                    status=StepStatus.COMPLETED,
                    result=result
                ))

            self.state.status = SagaStatus.COMPLETED
            logger.info(f"Saga {self.saga_id} completed successfully")

        except Exception as e:
            logger.error(f"Saga {self.saga_id} failed: {e}")
            self.state.error = str(e)
            self.state.status = SagaStatus.COMPENSATING
            await self._compensate()
            self.state.status = SagaStatus.FAILED

        return self.state

    async def _compensate(self) -> None:
        logger.info(f"Starting compensation for saga {self.saga_id}")

        for step_result in reversed(self.state.completed_steps):
            step = next(s for s in self.steps if s.name == step_result.name)

            try:
                logger.info(f"Compensating step: {step.name}")
                await step.compensate(self.state.context)
                step_result.status = StepStatus.COMPENSATED
            except Exception as e:
                logger.error(f"Compensation failed for {step.name}: {e}")
                step_result.status = StepStatus.FAILED
                step_result.error = str(e)

Servicios

Los servicios encapsulan la logica de negocio y el acceso a datos. Son independientes de la saga - no saben que son parte de una transaccion distribuida.

Esta separacion es importante porque:

En produccion, estos servicios se comunicarian con bases de datos y APIs externas en lugar de almacenar datos en memoria.

# services/order.py
from dataclasses import dataclass
from uuid import uuid4

@dataclass
class Order:
    id: str
    customer_id: str
    items: list
    total: float
    status: str

class OrderService:
    def __init__(self):
        self.orders: dict[str, Order] = {}

    async def create(self, customer_id: str, items: list, total: float) -> Order:
        order = Order(
            id=str(uuid4()),
            customer_id=customer_id,
            items=items,
            total=total,
            status="pending"
        )
        self.orders[order.id] = order
        return order

    async def cancel(self, order_id: str) -> None:
        if order_id in self.orders:
            self.orders[order_id].status = "cancelled"

    async def complete(self, order_id: str) -> None:
        if order_id in self.orders:
            self.orders[order_id].status = "completed"

Uso

El punto de entrada muestra como ensamblar todas las piezas:

  1. Instanciar servicios (en produccion, estos tendrian conexiones a DB)
  2. Crear los steps pasando los servicios necesarios
  3. Preparar el contexto inicial con datos del usuario
  4. Crear el orquestador con un ID unico y los steps
  5. Ejecutar la saga y manejar el resultado

asyncio.run() es el punto de entrada para codigo asincrono en Python, creando el event loop y ejecutando la corutina principal.

# main.py
import asyncio
from uuid import uuid4

from saga.base import SagaContext
from saga.orchestrator import SagaOrchestrator
from saga.steps import CreateOrderStep, ReserveStockStep, ProcessPaymentStep
from services.order import OrderService
from services.inventory import InventoryService
from services.payment import PaymentService

async def main():
    # Instanciar servicios
    order_service = OrderService()
    inventory_service = InventoryService()
    payment_service = PaymentService()

    # Definir steps
    steps = [
        CreateOrderStep(order_service),
        ReserveStockStep(inventory_service),
        ProcessPaymentStep(payment_service),
    ]

    # Crear contexto
    context = SagaContext(
        customer_id="cust-123",
        items=[{"product_id": "prod-1", "quantity": 2}],
        total=99.99
    )

    # Ejecutar saga
    orchestrator = SagaOrchestrator(
        saga_id=str(uuid4()),
        steps=steps
    )

    result = await orchestrator.execute(context)
    print(f"Saga status: {result.status}")
    print(f"Order ID: {result.context.order_id}")

if __name__ == "__main__":
    asyncio.run(main())

Resumen

Glosario

asyncio

Definicion: Biblioteca estandar de Python para escribir codigo asincrono usando la sintaxis async/await, permitiendo concurrencia sin threads.

Por que es importante: Permite que una saga ejecute multiples operaciones de I/O (llamadas HTTP, consultas DB) sin bloquear, mejorando el rendimiento cuando hay esperas de red.

Ejemplo practico: Mientras esperamos respuesta del servicio de pagos, podemos iniciar la llamada al servicio de notificaciones, reduciendo el tiempo total de la saga.


Abstract Base Class (ABC)

Definicion: Clase en Python que define metodos abstractos que todas las subclases deben implementar, garantizando una interfaz consistente.

Por que es importante: Asegura que todos los steps de saga tengan los metodos execute y compensate, evitando errores en tiempo de ejecucion.

Ejemplo practico: Si creamos un ShippingStep que olvida implementar compensate(), Python lanzara un error al instanciarlo, no cuando intentemos compensar.


dataclass

Definicion: Decorador de Python 3.7+ que genera automaticamente metodos comunes (__init__, __repr__, etc.) para clases que principalmente almacenan datos.

Por que es importante: Reduce el codigo repetitivo y hace las clases de datos mas concisas y legibles, facilitando la definicion de estados y resultados de saga.

Ejemplo practico: @dataclass class Order: id: str; total: float genera automaticamente el constructor Order(id="123", total=99.99).


Generic (TypeVar)

Definicion: Mecanismo de Python para crear clases y funciones que trabajan con tipos parametrizados, permitiendo reutilizacion de codigo con type safety.

Por que es importante: Permite que SagaStep[T] funcione con cualquier tipo de contexto, manteniendo la informacion de tipos para herramientas de analisis estatico.

Ejemplo practico: SagaStep[OrderContext] indica que este step trabaja especificamente con contextos de orden, y el editor puede autocompletar propiedades.


Inyeccion de Dependencias

Definicion: Patron de diseno donde una clase recibe sus dependencias (otros objetos que necesita) a traves del constructor en lugar de crearlas internamente.

Por que es importante: Facilita el testing al permitir inyectar mocks, y desacopla los componentes haciendo el codigo mas modular y mantenible.

Ejemplo practico: CreateOrderStep(order_service) recibe el servicio como parametro. En tests, pasamos un mock; en produccion, el servicio real.


Contexto de Saga

Definicion: Objeto mutable que viaja a traves de todos los steps de una saga, acumulando informacion necesaria para pasos posteriores y compensaciones.

Por que es importante: Permite que cada step acceda a resultados de steps anteriores (como IDs creados) y proporciona toda la informacion necesaria para compensar.

Ejemplo practico: El step de pago necesita el order_id creado por el step anterior. Este ID se almacena en el contexto y esta disponible para todos los steps siguientes.


← Capítulo 15: Go Workflows | Capítulo 17: Celery y Saga Pattern →