Capítulo 17: Celery y Saga Pattern
Capítulo 17: Celery y Saga Pattern
“Celery: tareas distribuidas para sagas resilientes”
Introduccion
El capitulo anterior implemento sagas con asyncio, pero todo se ejecutaba en un solo proceso. Para sistemas distribuidos reales, necesitamos ejecutar tareas en multiples maquinas con colas de mensajes.
Celery es una biblioteca de Python para ejecucion distribuida de tareas. Permite definir tareas que se ejecutan en workers separados, comunicandose a traves de un broker de mensajes.
Esta arquitectura proporciona:
- Escalabilidad: Agrega mas workers segun la demanda
- Resiliencia: Si un worker falla, otro toma la tarea
- Desacoplamiento: El proceso que inicia la saga no necesita esperar a que termine
Configuracion de Celery
La configuracion conecta Celery con sus dependencias:
- Broker: Cola de mensajes donde se colocan las tareas (usamos Redis)
- Backend: Donde se almacenan los resultados de tareas completadas
Los parametros importantes incluyen:
task_acks_late: El worker confirma la tarea solo despues de completarla (no cuando la recibe)worker_prefetch_multiplier=1: Cada worker toma solo una tarea a la vez, mejorando la distribucion
# celery_app.py
from celery import Celery
app = Celery(
'orderflow',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
)
Tasks como Steps de Saga
En Celery, cada step de la saga se convierte en una task (tarea). El decorador @shared_task registra la funcion como tarea ejecutable por workers.
Parametros importantes del decorador:
bind=True: Da acceso aself, permitiendo acceder al ID de tarea y metodos de reintentomax_retries=3: Numero maximo de reintentos automaticos
El metodo self.retry(exc=e, countdown=5) reencola la tarea para reintentarla en 5 segundos. Reject rechaza la tarea permanentemente (sin reintentos).
# tasks/order_tasks.py
from celery import shared_task
from celery.exceptions import Reject
import logging
logger = logging.getLogger(__name__)
@shared_task(bind=True, max_retries=3)
def create_order_task(self, customer_id: str, items: list, total: float) -> dict:
try:
# Simular creación de orden
order_id = f"order-{self.request.id[:8]}"
logger.info(f"Created order: {order_id}")
return {"order_id": order_id, "status": "pending"}
except Exception as e:
logger.error(f"Failed to create order: {e}")
raise self.retry(exc=e, countdown=5)
@shared_task(bind=True)
def cancel_order_task(self, order_id: str) -> dict:
logger.info(f"Cancelling order: {order_id}")
return {"order_id": order_id, "status": "cancelled"}
@shared_task(bind=True, max_retries=3)
def reserve_stock_task(self, order_id: str, items: list) -> dict:
try:
reservation_id = f"res-{order_id[-8:]}"
logger.info(f"Reserved stock: {reservation_id}")
return {"reservation_id": reservation_id}
except Exception as e:
raise self.retry(exc=e, countdown=5)
@shared_task(bind=True)
def release_stock_task(self, reservation_id: str) -> dict:
logger.info(f"Releasing stock: {reservation_id}")
return {"reservation_id": reservation_id, "status": "released"}
@shared_task(bind=True, max_retries=3)
def process_payment_task(self, order_id: str, amount: float) -> dict:
try:
payment_id = f"pay-{order_id[-8:]}"
logger.info(f"Payment processed: {payment_id}")
return {"payment_id": payment_id}
except Exception as e:
raise self.retry(exc=e, countdown=10)
@shared_task(bind=True)
def refund_payment_task(self, payment_id: str) -> dict:
logger.info(f"Refunding payment: {payment_id}")
return {"payment_id": payment_id, "status": "refunded"}
Orquestador con Celery Chain
El orquestador adaptado para Celery mantiene el estado en Redis y ejecuta tareas a traves de la cola.
Flujo de ejecucion:
- Guardar estado inicial en Redis
- Para cada step, llamar
task.delay()que encola la tarea - Esperar resultado con
result.get(timeout=60) - Actualizar contexto y estado
- Si algo falla, ejecutar compensaciones
delay() encola la tarea de forma asincrona. get() bloquea esperando el resultado.
# saga/celery_orchestrator.py
from celery import chain, group
from celery.result import AsyncResult
from dataclasses import dataclass, field
from enum import Enum
from typing import Callable
import logging
import redis
import json
logger = logging.getLogger(__name__)
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
COMPENSATING = "compensating"
@dataclass
class SagaStep:
name: str
task: Callable
compensate_task: Callable
args: tuple = ()
kwargs: dict = field(default_factory=dict)
class CelerySagaOrchestrator:
def __init__(self, saga_id: str, redis_client: redis.Redis):
self.saga_id = saga_id
self.redis = redis_client
self.steps: list[SagaStep] = []
self.completed_results: list[dict] = []
def add_step(self, step: SagaStep) -> 'CelerySagaOrchestrator':
self.steps.append(step)
return self
def _save_state(self, status: SagaStatus, results: list = None):
state = {
"saga_id": self.saga_id,
"status": status.value,
"results": results or [],
"steps": [s.name for s in self.steps]
}
self.redis.setex(f"saga:{self.saga_id}", 3600, json.dumps(state))
async def execute(self, initial_data: dict) -> dict:
self._save_state(SagaStatus.RUNNING)
context = initial_data.copy()
try:
for step in self.steps:
logger.info(f"Executing step: {step.name}")
# Ejecutar task
result = step.task.delay(*step.args, **step.kwargs, **context)
task_result = result.get(timeout=60)
# Actualizar contexto
context.update(task_result)
self.completed_results.append({
"step": step.name,
"result": task_result
})
self._save_state(SagaStatus.RUNNING, self.completed_results)
self._save_state(SagaStatus.COMPLETED, self.completed_results)
return {"status": "completed", "context": context}
except Exception as e:
logger.error(f"Saga failed: {e}")
self._save_state(SagaStatus.COMPENSATING)
await self._compensate(context)
self._save_state(SagaStatus.FAILED)
raise
async def _compensate(self, context: dict):
for i in range(len(self.completed_results) - 1, -1, -1):
step = self.steps[i]
step_result = self.completed_results[i]["result"]
try:
logger.info(f"Compensating: {step.name}")
comp_result = step.compensate_task.delay(**step_result)
comp_result.get(timeout=60)
except Exception as e:
logger.error(f"Compensation failed for {step.name}: {e}")
Saga con Celery Canvas
Celery Canvas proporciona primitivas para componer workflows de tareas:
- chain: Ejecuta tareas en secuencia, pasando el resultado de una a la siguiente
- group: Ejecuta tareas en paralelo
- chord: Ejecuta un group y luego una tarea callback con todos los resultados
signature (o .s()) crea una “firma” de tarea - una representacion serializable que puede componerse y ejecutarse despues.
Este enfoque es mas declarativo: defines el flujo una vez y Celery maneja la ejecucion.
# saga/canvas_saga.py
from celery import chain, chord, group, signature
from celery_app import app
from tasks.order_tasks import (
create_order_task, cancel_order_task,
reserve_stock_task, release_stock_task,
process_payment_task, refund_payment_task
)
def create_order_saga(customer_id: str, items: list, total: float):
"""Saga usando Celery Canvas para encadenar tareas."""
workflow = chain(
create_order_task.s(customer_id, items, total),
reserve_stock_task.s(items), # Recibe order_id del paso anterior
process_payment_task.s(total), # Recibe order_id
complete_order_task.s()
)
return workflow.apply_async()
@app.task(bind=True)
def complete_order_task(self, previous_results: dict) -> dict:
"""Task final que marca la orden como completada."""
order_id = previous_results.get("order_id")
return {"order_id": order_id, "status": "completed"}
# Saga con manejo de errores
@app.task(bind=True)
def saga_error_handler(self, task_id: str, saga_id: str):
"""Callback para manejar fallos y ejecutar compensaciones."""
from saga.celery_orchestrator import CelerySagaOrchestrator
import redis
r = redis.Redis()
state = json.loads(r.get(f"saga:{saga_id}"))
# Ejecutar compensaciones según estado guardado
for result in reversed(state.get("results", [])):
step_name = result["step"]
# Mapear step a compensación y ejecutar
Monitoreo con Flower
Flower es una herramienta web de monitoreo en tiempo real para Celery. Proporciona:
- Vista de workers activos y su estado
- Cola de tareas pendientes y en progreso
- Historial de tareas completadas/fallidas
- Graficas de rendimiento
- Capacidad de cancelar tareas o reiniciar workers
# Instalar Flower
pip install flower
# Iniciar monitoreo
celery -A celery_app flower --port=5555
Accede a http://localhost:5555 para ver el dashboard.
Uso Completo
# main.py
import asyncio
import redis
from saga.celery_orchestrator import CelerySagaOrchestrator, SagaStep
from tasks.order_tasks import *
async def main():
r = redis.Redis()
saga_id = "saga-001"
orchestrator = CelerySagaOrchestrator(saga_id, r)
orchestrator.add_step(SagaStep(
name="create_order",
task=create_order_task,
compensate_task=cancel_order_task
)).add_step(SagaStep(
name="reserve_stock",
task=reserve_stock_task,
compensate_task=release_stock_task
)).add_step(SagaStep(
name="process_payment",
task=process_payment_task,
compensate_task=refund_payment_task
))
result = await orchestrator.execute({
"customer_id": "cust-123",
"items": [{"product_id": "p1", "qty": 2}],
"total": 99.99
})
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Resumen
- Celery distribuye tareas entre workers
- Redis como broker y backend de resultados
- Canvas (chain, group, chord) para flujos complejos
- Estado persistido en Redis para recuperacion
- Flower para monitoreo visual
Glosario
Celery
Definicion: Biblioteca de Python para ejecucion distribuida de tareas asincronas, utilizando colas de mensajes para comunicacion entre procesos.
Por que es importante: Permite escalar horizontalmente agregando mas workers, y proporciona reintentos automaticos, scheduling y monitoreo para tareas de larga duracion.
Ejemplo practico: Una saga con 5 steps puede ejecutarse en 5 workers diferentes, cada uno procesando un step mientras los demas estan disponibles para otras sagas.
Broker de Mensajes
Definicion: Sistema intermediario que recibe, almacena y entrega mensajes entre productores y consumidores, desacoplando los componentes del sistema.
Por que es importante: Permite comunicacion asincrona y confiable entre servicios. Si un consumer no esta disponible, los mensajes se almacenan hasta que pueda procesarlos.
Ejemplo practico: Redis actua como broker: el orquestador encola una tarea de pago, Redis la almacena, y cuando un worker esta libre, la toma y procesa.
Worker (Celery)
Definicion: Proceso que escucha una cola de tareas y ejecuta las tareas que recibe del broker.
Por que es importante: Los workers son los que ejecutan el trabajo real. Puedes tener multiples workers en diferentes maquinas para escalabilidad y redundancia.
Ejemplo practico: celery -A app worker --concurrency=4 inicia un worker con 4 hilos, capaz de procesar 4 tareas simultaneamente.
Celery Canvas
Definicion: API de Celery para componer flujos de trabajo complejos usando primitivas como chain (secuencia), group (paralelo) y chord (paralelo + callback).
Por que es importante: Permite expresar flujos de saga de forma declarativa y clara, delegando a Celery la coordinacion de la ejecucion.
Ejemplo practico: chain(crear_orden.s(), reservar_stock.s(), procesar_pago.s()) define una saga donde cada tarea pasa su resultado a la siguiente.
Task Acknowledgement (acks_late)
Definicion: Configuracion que determina cuando un worker confirma al broker que recibio una tarea: inmediatamente o despues de completarla.
Por que es importante: Con acks_late=True, si un worker falla durante la ejecucion, la tarea se reencola automaticamente porque nunca fue confirmada.
Ejemplo practico: Un worker recibe una tarea de pago, comienza a procesarla, pero el servidor se reinicia. Con acks_late, otro worker recibe la misma tarea y la completa.
Flower
Definicion: Herramienta web de monitoreo en tiempo real para Celery que proporciona visibilidad sobre workers, tareas y rendimiento.
Por que es importante: Permite diagnosticar problemas, ver tareas fallidas, monitorear colas y entender el comportamiento del sistema en produccion.
Ejemplo practico: En Flower ves que la cola tiene 500 tareas pendientes pero solo 2 workers activos, indicando que necesitas escalar.
Countdown (Retry)
Definicion: Tiempo en segundos que Celery espera antes de reintentar una tarea fallida.
Por que es importante: Permite implementar backoff (espera incremental), dando tiempo a servicios externos para recuperarse antes de reintentar.
Ejemplo practico: self.retry(countdown=5) espera 5 segundos. Con backoff exponencial seria countdown=2**self.request.retries (2, 4, 8 segundos).