← Volver al listado de tecnologías

Capítulo 17: Celery y Saga Pattern

Por: SiempreListo
sagapythonceleryredistareas

Capítulo 17: Celery y Saga Pattern

“Celery: tareas distribuidas para sagas resilientes”

Introduccion

El capitulo anterior implemento sagas con asyncio, pero todo se ejecutaba en un solo proceso. Para sistemas distribuidos reales, necesitamos ejecutar tareas en multiples maquinas con colas de mensajes.

Celery es una biblioteca de Python para ejecucion distribuida de tareas. Permite definir tareas que se ejecutan en workers separados, comunicandose a traves de un broker de mensajes.

Esta arquitectura proporciona:

Configuracion de Celery

La configuracion conecta Celery con sus dependencias:

Los parametros importantes incluyen:

# celery_app.py
from celery import Celery

app = Celery(
    'orderflow',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
)

Tasks como Steps de Saga

En Celery, cada step de la saga se convierte en una task (tarea). El decorador @shared_task registra la funcion como tarea ejecutable por workers.

Parametros importantes del decorador:

El metodo self.retry(exc=e, countdown=5) reencola la tarea para reintentarla en 5 segundos. Reject rechaza la tarea permanentemente (sin reintentos).

# tasks/order_tasks.py
from celery import shared_task
from celery.exceptions import Reject
import logging

logger = logging.getLogger(__name__)

@shared_task(bind=True, max_retries=3)
def create_order_task(self, customer_id: str, items: list, total: float) -> dict:
    try:
        # Simular creación de orden
        order_id = f"order-{self.request.id[:8]}"
        logger.info(f"Created order: {order_id}")
        return {"order_id": order_id, "status": "pending"}
    except Exception as e:
        logger.error(f"Failed to create order: {e}")
        raise self.retry(exc=e, countdown=5)

@shared_task(bind=True)
def cancel_order_task(self, order_id: str) -> dict:
    logger.info(f"Cancelling order: {order_id}")
    return {"order_id": order_id, "status": "cancelled"}

@shared_task(bind=True, max_retries=3)
def reserve_stock_task(self, order_id: str, items: list) -> dict:
    try:
        reservation_id = f"res-{order_id[-8:]}"
        logger.info(f"Reserved stock: {reservation_id}")
        return {"reservation_id": reservation_id}
    except Exception as e:
        raise self.retry(exc=e, countdown=5)

@shared_task(bind=True)
def release_stock_task(self, reservation_id: str) -> dict:
    logger.info(f"Releasing stock: {reservation_id}")
    return {"reservation_id": reservation_id, "status": "released"}

@shared_task(bind=True, max_retries=3)
def process_payment_task(self, order_id: str, amount: float) -> dict:
    try:
        payment_id = f"pay-{order_id[-8:]}"
        logger.info(f"Payment processed: {payment_id}")
        return {"payment_id": payment_id}
    except Exception as e:
        raise self.retry(exc=e, countdown=10)

@shared_task(bind=True)
def refund_payment_task(self, payment_id: str) -> dict:
    logger.info(f"Refunding payment: {payment_id}")
    return {"payment_id": payment_id, "status": "refunded"}

Orquestador con Celery Chain

El orquestador adaptado para Celery mantiene el estado en Redis y ejecuta tareas a traves de la cola.

Flujo de ejecucion:

  1. Guardar estado inicial en Redis
  2. Para cada step, llamar task.delay() que encola la tarea
  3. Esperar resultado con result.get(timeout=60)
  4. Actualizar contexto y estado
  5. Si algo falla, ejecutar compensaciones

delay() encola la tarea de forma asincrona. get() bloquea esperando el resultado.

# saga/celery_orchestrator.py
from celery import chain, group
from celery.result import AsyncResult
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
import logging
import redis
import json

logger = logging.getLogger(__name__)

class SagaStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    COMPENSATING = "compensating"

@dataclass
class SagaStep:
    name: str
    task: Callable
    compensate_task: Callable
    args: tuple = ()
    kwargs: dict = field(default_factory=dict)

class CelerySagaOrchestrator:
    def __init__(self, saga_id: str, redis_client: redis.Redis):
        self.saga_id = saga_id
        self.redis = redis_client
        self.steps: list[SagaStep] = []
        self.completed_results: list[dict] = []

    def add_step(self, step: SagaStep) -> 'CelerySagaOrchestrator':
        self.steps.append(step)
        return self

    def _save_state(self, status: SagaStatus, results: list = None):
        state = {
            "saga_id": self.saga_id,
            "status": status.value,
            "results": results or [],
            "steps": [s.name for s in self.steps]
        }
        self.redis.setex(f"saga:{self.saga_id}", 3600, json.dumps(state))

    async def execute(self, initial_data: dict) -> dict:
        self._save_state(SagaStatus.RUNNING)
        context = initial_data.copy()

        try:
            for step in self.steps:
                logger.info(f"Executing step: {step.name}")

                # Ejecutar task
                result = step.task.delay(*step.args, **step.kwargs, **context)
                task_result = result.get(timeout=60)

                # Actualizar contexto
                context.update(task_result)
                self.completed_results.append({
                    "step": step.name,
                    "result": task_result
                })

                self._save_state(SagaStatus.RUNNING, self.completed_results)

            self._save_state(SagaStatus.COMPLETED, self.completed_results)
            return {"status": "completed", "context": context}

        except Exception as e:
            logger.error(f"Saga failed: {e}")
            self._save_state(SagaStatus.COMPENSATING)
            await self._compensate(context)
            self._save_state(SagaStatus.FAILED)
            raise

    async def _compensate(self, context: dict):
        for i in range(len(self.completed_results) - 1, -1, -1):
            step = self.steps[i]
            step_result = self.completed_results[i]["result"]

            try:
                logger.info(f"Compensating: {step.name}")
                comp_result = step.compensate_task.delay(**step_result)
                comp_result.get(timeout=60)
            except Exception as e:
                logger.error(f"Compensation failed for {step.name}: {e}")

Saga con Celery Canvas

Celery Canvas proporciona primitivas para componer workflows de tareas:

signature (o .s()) crea una “firma” de tarea - una representacion serializable que puede componerse y ejecutarse despues.

Este enfoque es mas declarativo: defines el flujo una vez y Celery maneja la ejecucion.

# saga/canvas_saga.py
from celery import chain, chord, group, signature
from celery_app import app
from tasks.order_tasks import (
    create_order_task, cancel_order_task,
    reserve_stock_task, release_stock_task,
    process_payment_task, refund_payment_task
)

def create_order_saga(customer_id: str, items: list, total: float):
    """Saga usando Celery Canvas para encadenar tareas."""

    workflow = chain(
        create_order_task.s(customer_id, items, total),
        reserve_stock_task.s(items),  # Recibe order_id del paso anterior
        process_payment_task.s(total),  # Recibe order_id
        complete_order_task.s()
    )

    return workflow.apply_async()

@app.task(bind=True)
def complete_order_task(self, previous_results: dict) -> dict:
    """Task final que marca la orden como completada."""
    order_id = previous_results.get("order_id")
    return {"order_id": order_id, "status": "completed"}

# Saga con manejo de errores
@app.task(bind=True)
def saga_error_handler(self, task_id: str, saga_id: str):
    """Callback para manejar fallos y ejecutar compensaciones."""
    from saga.celery_orchestrator import CelerySagaOrchestrator
    import redis

    r = redis.Redis()
    state = json.loads(r.get(f"saga:{saga_id}"))

    # Ejecutar compensaciones según estado guardado
    for result in reversed(state.get("results", [])):
        step_name = result["step"]
        # Mapear step a compensación y ejecutar

Monitoreo con Flower

Flower es una herramienta web de monitoreo en tiempo real para Celery. Proporciona:

# Instalar Flower
pip install flower

# Iniciar monitoreo
celery -A celery_app flower --port=5555

Accede a http://localhost:5555 para ver el dashboard.

Uso Completo

# main.py
import asyncio
import redis
from saga.celery_orchestrator import CelerySagaOrchestrator, SagaStep
from tasks.order_tasks import *

async def main():
    r = redis.Redis()
    saga_id = "saga-001"

    orchestrator = CelerySagaOrchestrator(saga_id, r)
    orchestrator.add_step(SagaStep(
        name="create_order",
        task=create_order_task,
        compensate_task=cancel_order_task
    )).add_step(SagaStep(
        name="reserve_stock",
        task=reserve_stock_task,
        compensate_task=release_stock_task
    )).add_step(SagaStep(
        name="process_payment",
        task=process_payment_task,
        compensate_task=refund_payment_task
    ))

    result = await orchestrator.execute({
        "customer_id": "cust-123",
        "items": [{"product_id": "p1", "qty": 2}],
        "total": 99.99
    })

    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Resumen

Glosario

Celery

Definicion: Biblioteca de Python para ejecucion distribuida de tareas asincronas, utilizando colas de mensajes para comunicacion entre procesos.

Por que es importante: Permite escalar horizontalmente agregando mas workers, y proporciona reintentos automaticos, scheduling y monitoreo para tareas de larga duracion.

Ejemplo practico: Una saga con 5 steps puede ejecutarse en 5 workers diferentes, cada uno procesando un step mientras los demas estan disponibles para otras sagas.


Broker de Mensajes

Definicion: Sistema intermediario que recibe, almacena y entrega mensajes entre productores y consumidores, desacoplando los componentes del sistema.

Por que es importante: Permite comunicacion asincrona y confiable entre servicios. Si un consumer no esta disponible, los mensajes se almacenan hasta que pueda procesarlos.

Ejemplo practico: Redis actua como broker: el orquestador encola una tarea de pago, Redis la almacena, y cuando un worker esta libre, la toma y procesa.


Worker (Celery)

Definicion: Proceso que escucha una cola de tareas y ejecuta las tareas que recibe del broker.

Por que es importante: Los workers son los que ejecutan el trabajo real. Puedes tener multiples workers en diferentes maquinas para escalabilidad y redundancia.

Ejemplo practico: celery -A app worker --concurrency=4 inicia un worker con 4 hilos, capaz de procesar 4 tareas simultaneamente.


Celery Canvas

Definicion: API de Celery para componer flujos de trabajo complejos usando primitivas como chain (secuencia), group (paralelo) y chord (paralelo + callback).

Por que es importante: Permite expresar flujos de saga de forma declarativa y clara, delegando a Celery la coordinacion de la ejecucion.

Ejemplo practico: chain(crear_orden.s(), reservar_stock.s(), procesar_pago.s()) define una saga donde cada tarea pasa su resultado a la siguiente.


Task Acknowledgement (acks_late)

Definicion: Configuracion que determina cuando un worker confirma al broker que recibio una tarea: inmediatamente o despues de completarla.

Por que es importante: Con acks_late=True, si un worker falla durante la ejecucion, la tarea se reencola automaticamente porque nunca fue confirmada.

Ejemplo practico: Un worker recibe una tarea de pago, comienza a procesarla, pero el servidor se reinicia. Con acks_late, otro worker recibe la misma tarea y la completa.


Flower

Definicion: Herramienta web de monitoreo en tiempo real para Celery que proporciona visibilidad sobre workers, tareas y rendimiento.

Por que es importante: Permite diagnosticar problemas, ver tareas fallidas, monitorear colas y entender el comportamiento del sistema en produccion.

Ejemplo practico: En Flower ves que la cola tiene 500 tareas pendientes pero solo 2 workers activos, indicando que necesitas escalar.


Countdown (Retry)

Definicion: Tiempo en segundos que Celery espera antes de reintentar una tarea fallida.

Por que es importante: Permite implementar backoff (espera incremental), dando tiempo a servicios externos para recuperarse antes de reintentar.

Ejemplo practico: self.retry(countdown=5) espera 5 segundos. Con backoff exponencial seria countdown=2**self.request.retries (2, 4, 8 segundos).


← Capítulo 16: Python Sagas | Capítulo 18: RabbitMQ →