← Volver al listado de tecnologías

Capítulo 12: Saga Orchestrator - Implementación

Por: SiempreListo
sagaorchestratorimplementacióntypescript

Capítulo 12: Saga Orchestrator - Implementación

“Donde todo se une”

Introducción

Este capítulo implementa el motor del orquestador diseñado en el capítulo anterior. El SagaEngine es el corazón del sistema, responsable de ejecutar sagas paso a paso y coordinar las compensaciones cuando algo falla.

Los componentes principales son:

Motor de Saga

packages/saga-orchestrator/src/engine/saga-engine.ts

import type { SagaDefinition, SagaInstance, SagaContext, StepResult, SagaStatus } from '../types.js';
import type { SagaRepository } from '../repository/saga-repository.js';
import type { ServiceClient } from '../client/service-clients.js';
import { StepExecutor } from './step-executor.js';

export class SagaEngine {
  private definitions = new Map<string, SagaDefinition>();
  private clients = new Map<string, ServiceClient>();

  constructor(
    private repository: SagaRepository,
    private stepExecutor: StepExecutor
  ) {}

  registerDefinition(definition: SagaDefinition): void {
    this.definitions.set(definition.name, definition);
  }

  registerClient(serviceName: string, client: ServiceClient): void {
    this.clients.set(serviceName, client);
  }

  async startSaga(definitionName: string, context: SagaContext): Promise<SagaInstance> {
    const definition = this.definitions.get(definitionName);
    if (!definition) {
      throw new Error(`Unknown saga definition: ${definitionName}`);
    }

    const saga: SagaInstance = {
      id: crypto.randomUUID(),
      definitionName,
      orderId: context.orderId,
      status: 'started',
      currentStepIndex: 0,
      context,
      stepResults: definition.steps.map(step => ({
        stepName: step.name,
        status: 'pending',
        executedAt: null,
        completedAt: null,
        result: null,
        error: null
      })),
      startedAt: new Date(),
      completedAt: null,
      error: null
    };

    await this.repository.create(saga);
    return this.executeSaga(saga);
  }

  async executeSaga(saga: SagaInstance): Promise<SagaInstance> {
    const definition = this.definitions.get(saga.definitionName)!;

    while (saga.currentStepIndex < definition.steps.length) {
      const stepDef = definition.steps[saga.currentStepIndex];
      const client = this.clients.get(stepDef.service);

      if (!client) {
        throw new Error(`No client for service: ${stepDef.service}`);
      }

      saga.status = 'step_executing';
      await this.repository.update(saga);

      try {
        const result = await this.stepExecutor.execute(stepDef, client, saga.context);

        // Actualizar contexto con resultado
        this.updateContext(saga.context, stepDef.name, result);

        // Marcar paso como completado
        saga.stepResults[saga.currentStepIndex] = {
          stepName: stepDef.name,
          status: 'completed',
          executedAt: new Date(),
          completedAt: new Date(),
          result,
          error: null
        };

        saga.currentStepIndex++;
        saga.status = 'step_completed';
        await this.repository.update(saga);

      } catch (error) {
        saga.stepResults[saga.currentStepIndex] = {
          stepName: stepDef.name,
          status: 'failed',
          executedAt: new Date(),
          completedAt: null,
          result: null,
          error: (error as Error).message
        };

        saga.error = (error as Error).message;
        await this.repository.update(saga);

        // Iniciar compensación
        return this.compensateSaga(saga);
      }
    }

    // Saga completada exitosamente
    saga.status = 'completed';
    saga.completedAt = new Date();
    await this.repository.update(saga);

    return saga;
  }

  async compensateSaga(saga: SagaInstance): Promise<SagaInstance> {
    const definition = this.definitions.get(saga.definitionName)!;
    saga.status = 'compensating';
    await this.repository.update(saga);

    // Compensar en orden inverso
    for (let i = saga.currentStepIndex - 1; i >= 0; i--) {
      const stepDef = definition.steps[i];
      const stepResult = saga.stepResults[i];

      if (stepResult.status !== 'completed' || stepDef.compensationAction === 'none') {
        continue;
      }

      const client = this.clients.get(stepDef.service)!;

      try {
        await client.compensate(stepDef.compensationAction, saga.context);
        saga.stepResults[i].status = 'compensated';
        await this.repository.update(saga);
      } catch (error) {
        console.error(`Compensation failed for ${stepDef.name}:`, error);
        saga.status = 'compensation_failed';
        saga.error = `Compensation failed at ${stepDef.name}: ${(error as Error).message}`;
        await this.repository.update(saga);
        return saga;
      }
    }

    saga.status = 'compensated';
    saga.completedAt = new Date();
    await this.repository.update(saga);

    return saga;
  }

  private updateContext(context: SagaContext, stepName: string, result: unknown): void {
    const r = result as Record<string, unknown>;

    switch (stepName) {
      case 'createOrder':
        context.orderId = r.id as string;
        break;
      case 'reserveStock':
        context.reservationId = r.reservationId as string;
        break;
      case 'processPayment':
        context.paymentId = r.paymentId as string;
        break;
    }
  }

  async resumePendingSagas(): Promise<void> {
    const pending = await this.repository.findPendingSagas();

    for (const saga of pending) {
      console.log(`Resuming saga ${saga.id}`);
      try {
        if (saga.status === 'compensating') {
          await this.compensateSaga(saga);
        } else {
          await this.executeSaga(saga);
        }
      } catch (error) {
        console.error(`Failed to resume saga ${saga.id}:`, error);
      }
    }
  }
}

Ejecutor de Pasos

El StepExecutor encapsula la lógica de ejecutar un paso individual. Se encarga de:

Esta separación permite que el SagaEngine se enfoque en la coordinación general, delegando los detalles de ejecución al StepExecutor.

packages/saga-orchestrator/src/engine/step-executor.ts

import type { SagaStepDefinition, SagaContext } from '../types.js';
import type { ServiceClient } from '../client/service-clients.js';

export class StepExecutor {
  async execute(
    stepDef: SagaStepDefinition,
    client: ServiceClient,
    context: SagaContext
  ): Promise<unknown> {
    const payload = this.buildPayload(stepDef.name, context);
    let lastError: Error | null = null;

    for (let attempt = 1; attempt <= stepDef.retryPolicy.maxAttempts; attempt++) {
      try {
        return await this.executeWithTimeout(
          () => client.execute(stepDef.action, payload),
          stepDef.timeout
        );
      } catch (error) {
        lastError = error as Error;

        if (attempt < stepDef.retryPolicy.maxAttempts) {
          const delay = Math.min(
            stepDef.retryPolicy.backoffMs * Math.pow(2, attempt - 1),
            stepDef.retryPolicy.maxBackoffMs
          );
          console.log(`Step ${stepDef.name} failed, retry ${attempt} in ${delay}ms`);
          await this.sleep(delay);
        }
      }
    }

    throw lastError;
  }

  private buildPayload(stepName: string, context: SagaContext): unknown {
    switch (stepName) {
      case 'createOrder':
        return {
          customerId: context.customerId,
          items: context.items,
          shippingAddress: context.shippingAddress
        };
      case 'reserveStock':
        return {
          orderId: context.orderId,
          items: context.items.map(i => ({
            productId: i.productId,
            quantity: i.quantity
          }))
        };
      case 'processPayment':
        return {
          orderId: context.orderId,
          customerId: context.customerId,
          amount: context.total
        };
      case 'completeOrder':
        return { orderId: context.orderId };
      default:
        return context;
    }
  }

  private async executeWithTimeout<T>(
    operation: () => Promise<T>,
    timeoutMs: number
  ): Promise<T> {
    return Promise.race([
      operation(),
      new Promise<never>((_, reject) =>
        setTimeout(() => reject(new Error('Step timeout')), timeoutMs)
      )
    ]);
  }

  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Servicio del Orquestador

El OrchestratorService es la capa de aplicación que expone las operaciones del orquestador. Actúa como fachada, simplificando el uso del motor para los consumidores (API, otros servicios).

packages/saga-orchestrator/src/service/orchestrator-service.ts

import { SagaEngine } from '../engine/saga-engine.js';
import { SagaRepository } from '../repository/saga-repository.js';
import type { SagaContext, SagaInstance } from '../types.js';

export interface CreateOrderRequest {
  customerId: string;
  items: Array<{
    productId: string;
    quantity: number;
    unitPrice: number;
  }>;
  shippingAddress: {
    street: string;
    city: string;
    country: string;
    zipCode: string;
  };
}

export class OrchestratorService {
  constructor(
    private engine: SagaEngine,
    private repository: SagaRepository
  ) {}

  async createOrder(request: CreateOrderRequest): Promise<SagaInstance> {
    const context: SagaContext = {
      orderId: crypto.randomUUID(),
      customerId: request.customerId,
      items: request.items,
      total: request.items.reduce((sum, i) => sum + i.unitPrice * i.quantity, 0),
      shippingAddress: request.shippingAddress
    };

    return this.engine.startSaga('create-order', context);
  }

  async getSagaStatus(sagaId: string): Promise<SagaInstance | null> {
    return this.repository.findById(sagaId);
  }

  async getSagaByOrder(orderId: string): Promise<SagaInstance | null> {
    return this.repository.findByOrderId(orderId);
  }

  async retryCompensation(sagaId: string): Promise<SagaInstance> {
    const saga = await this.repository.findById(sagaId);
    if (!saga) {
      throw new Error('Saga not found');
    }

    if (saga.status !== 'compensation_failed') {
      throw new Error('Saga is not in compensation_failed status');
    }

    return this.engine.compensateSaga(saga);
  }
}

API del Orquestador

packages/saga-orchestrator/src/api/routes.ts

import { Hono } from 'hono';
import { z } from 'zod';
import { zValidator } from '@hono/zod-validator';
import { OrchestratorService } from '../service/orchestrator-service.js';

const CreateOrderSchema = z.object({
  customerId: z.string().uuid(),
  items: z.array(z.object({
    productId: z.string().uuid(),
    quantity: z.number().int().positive(),
    unitPrice: z.number().positive()
  })).min(1),
  shippingAddress: z.object({
    street: z.string(),
    city: z.string(),
    country: z.string(),
    zipCode: z.string()
  })
});

export function createRoutes(service: OrchestratorService): Hono {
  const app = new Hono();

  app.post('/sagas/order', zValidator('json', CreateOrderSchema), async (c) => {
    const request = c.req.valid('json');
    try {
      const saga = await service.createOrder(request);
      return c.json({
        sagaId: saga.id,
        orderId: saga.orderId,
        status: saga.status
      }, 201);
    } catch (error) {
      return c.json({ error: (error as Error).message }, 500);
    }
  });

  app.get('/sagas/:sagaId', async (c) => {
    const saga = await service.getSagaStatus(c.req.param('sagaId'));
    if (!saga) {
      return c.json({ error: 'Saga not found' }, 404);
    }
    return c.json(saga);
  });

  app.get('/sagas/order/:orderId', async (c) => {
    const saga = await service.getSagaByOrder(c.req.param('orderId'));
    if (!saga) {
      return c.json({ error: 'Saga not found' }, 404);
    }
    return c.json(saga);
  });

  app.post('/sagas/:sagaId/retry-compensation', async (c) => {
    try {
      const saga = await service.retryCompensation(c.req.param('sagaId'));
      return c.json({ status: saga.status });
    } catch (error) {
      return c.json({ error: (error as Error).message }, 400);
    }
  });

  app.get('/health', (c) => c.json({ status: 'healthy' }));

  return app;
}

Entry Point

El Entry Point es donde todo se conecta. Aquí se:

packages/saga-orchestrator/src/index.ts

import { serve } from '@hono/node-server';
import postgres from 'postgres';
import { SagaRepository } from './repository/saga-repository.js';
import { SagaEngine } from './engine/saga-engine.js';
import { StepExecutor } from './engine/step-executor.js';
import { OrchestratorService } from './service/orchestrator-service.js';
import { createRoutes } from './api/routes.js';
import { orderSagaDefinition } from './definition/order-saga-definition.js';
import {
  OrderServiceClient,
  InventoryServiceClient,
  PaymentServiceClient
} from './client/service-clients.js';

const DATABASE_URL = process.env.DATABASE_URL || 'postgres://orderflow:orderflow@localhost:5432/orderflow';
const PORT = parseInt(process.env.PORT || '3000');

const ORDER_SERVICE_URL = process.env.ORDER_SERVICE_URL || 'http://localhost:3001';
const INVENTORY_SERVICE_URL = process.env.INVENTORY_SERVICE_URL || 'http://localhost:3002';
const PAYMENT_SERVICE_URL = process.env.PAYMENT_SERVICE_URL || 'http://localhost:3003';

async function main() {
  const sql = postgres(DATABASE_URL);
  const repository = new SagaRepository(sql);
  const stepExecutor = new StepExecutor();
  const engine = new SagaEngine(repository, stepExecutor);

  // Registrar definición de saga
  engine.registerDefinition(orderSagaDefinition);

  // Registrar clientes de servicio
  engine.registerClient('order-service', new OrderServiceClient(ORDER_SERVICE_URL));
  engine.registerClient('inventory-service', new InventoryServiceClient(INVENTORY_SERVICE_URL));
  engine.registerClient('payment-service', new PaymentServiceClient(PAYMENT_SERVICE_URL));

  const service = new OrchestratorService(engine, repository);
  const app = createRoutes(service);

  // Reanudar sagas pendientes al iniciar
  console.log('Resuming pending sagas...');
  await engine.resumePendingSagas();

  console.log(`Saga Orchestrator running on port ${PORT}`);
  serve({ fetch: app.fetch, port: PORT });
}

main().catch(console.error);

Ejemplo de Uso

# Crear una orden
curl -X POST http://localhost:3000/sagas/order \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": "550e8400-e29b-41d4-a716-446655440000",
    "items": [
      {
        "productId": "550e8400-e29b-41d4-a716-446655440001",
        "quantity": 2,
        "unitPrice": 29.99
      }
    ],
    "shippingAddress": {
      "street": "123 Main St",
      "city": "Santiago",
      "country": "Chile",
      "zipCode": "8320000"
    }
  }'

# Respuesta
{
  "sagaId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "orderId": "f0e1d2c3-b4a5-6789-0fed-cba987654321",
  "status": "completed"
}

# Consultar estado de la saga
curl http://localhost:3000/sagas/a1b2c3d4-e5f6-7890-abcd-ef1234567890

Resumen

Glosario

Saga Engine

Definición: Componente central que coordina la ejecución de sagas, gestionando el flujo de pasos, actualizando estados, y orquestando compensaciones cuando ocurren fallos.

Por qué es importante: Es el “cerebro” del patrón Saga orquestado. Toda la lógica de coordinación está centralizada aquí, facilitando el mantenimiento y la depuración.

Ejemplo práctico: engine.startSaga('create-order', context) inicia una nueva saga. El engine ejecuta paso a paso, guarda el estado después de cada transición, y maneja compensaciones si algo falla.


Step Executor

Definición: Componente responsable de ejecutar un paso individual de la saga, aplicando la política de reintentos y respetando el timeout configurado.

Por qué es importante: Separa la lógica de “cómo ejecutar un paso” de “qué pasos ejecutar”. El engine se enfoca en la secuencia; el executor se enfoca en la ejecución robusta.

Ejemplo práctico: stepExecutor.execute(stepDef, client, context) intenta ejecutar el paso hasta 3 veces con backoff exponencial, respetando el timeout de 5 segundos.


Payload Builder

Definición: Lógica que construye los datos de entrada para cada paso basándose en el contexto actual de la saga.

Por qué es importante: Cada servicio espera datos en un formato específico. El builder transforma el contexto general de la saga al formato que cada servicio necesita.

Ejemplo práctico: Para reserveStock, el builder extrae {orderId, items} del contexto. Para processPayment, extrae {orderId, customerId, amount}.


Context Update

Definición: Proceso de agregar los resultados de un paso al contexto de la saga para que estén disponibles para pasos posteriores.

Por qué es importante: Los pasos dependen de datos generados por pasos anteriores. Sin actualizar el contexto, el siguiente paso no tendría la información necesaria.

Ejemplo práctico: Después de createOrder, el resultado {id: 'order-123'} se agrega al contexto como context.orderId = 'order-123'. Ahora reserveStock puede usarlo.


Promise.race

Definición: Método de JavaScript que toma múltiples promesas y devuelve la primera que se resuelva o rechace.

Por qué es importante: Se usa para implementar timeouts. Corremos la operación contra un timer; si el timer gana, significa que la operación tardó demasiado.

Ejemplo práctico: Promise.race([operation(), timeoutPromise]) - si la operación tarda más que el timeout, la promesa de timeout se resuelve primero y lanzamos error.


Dependency Injection (Manual)

Definición: Patrón donde las dependencias de un componente se pasan desde afuera (inyectan) en lugar de crearlas internamente.

Por qué es importante: Facilita el testing (puedes inyectar mocks), el cambio de implementaciones, y hace explícitas las dependencias de cada componente.

Ejemplo práctico: new SagaEngine(repository, stepExecutor) recibe sus dependencias. No las crea internamente. En tests, puedes pasar un mock repository.


Reanudación de Sagas

Definición: Proceso de detectar sagas que quedaron incompletas (por caída del sistema) y continuar su ejecución desde donde quedaron.

Por qué es importante: Garantiza que ninguna saga se pierda. Incluso si el servidor se reinicia a mitad de una saga, esta se completará eventualmente.

Ejemplo práctico: Al iniciar, engine.resumePendingSagas() busca sagas con status IN ('started', 'step_executing', 'compensating') y las continúa.


Facade Pattern

Definición: Patrón que proporciona una interfaz simplificada a un subsistema complejo, ocultando su complejidad interna.

Por qué es importante: El OrchestratorService actúa como fachada. Los consumidores solo necesitan llamar a createOrder(request), sin conocer el engine, los clientes, o el repositorio.

Ejemplo práctico: service.createOrder(request) internamente crea el contexto, llama al engine, y devuelve la saga. El consumidor no ve esa complejidad.


Configuración por Entorno

Definición: Usar variables de entorno para configurar el comportamiento de la aplicación según el ambiente (desarrollo, testing, producción).

Por qué es importante: Permite desplegar el mismo código en diferentes ambientes con diferentes configuraciones (URLs de servicios, credenciales, etc.).

Ejemplo práctico: process.env.ORDER_SERVICE_URL || 'http://localhost:3001' usa la variable de entorno si existe, o un valor por defecto para desarrollo local.


Graceful Startup

Definición: Proceso de inicialización donde el sistema se asegura de estar completamente listo antes de aceptar tráfico, incluyendo reconectar con trabajos pendientes.

Por qué es importante: Evita perder datos y garantiza continuidad. Las sagas pendientes se reanudan antes de que lleguen nuevas solicitudes.

Ejemplo práctico: El entry point primero conecta a la base de datos, luego reanuda sagas pendientes, y finalmente inicia el servidor HTTP para aceptar nuevas solicitudes.


Logging Estructurado

Definición: Práctica de registrar mensajes de log con datos estructurados (JSON) en lugar de solo texto plano, facilitando el análisis y búsqueda.

Por qué es importante: Permite filtrar logs por sagaId, orderId, stepName, etc. Herramientas como ELK pueden indexar y buscar eficientemente.

Ejemplo práctico: En lugar de console.log('Step failed'), usar console.log({event: 'step_failed', sagaId, stepName, error: error.message}).


← Capítulo 11: Orchestrator - Diseño | Capítulo 13: Saga State →