Capítulo 11: Saga Orchestrator - Diseño
Capítulo 11: Saga Orchestrator - Diseño
“El director de la orquesta distribuida”
Introducción
El Saga Orchestrator es el componente central que coordina toda la ejecución de las sagas. Este capítulo cubre el diseño arquitectónico antes de pasar a la implementación en el siguiente capítulo.
Un orquestador bien diseñado debe:
- Conocer la definición de cada tipo de saga (qué pasos tiene, en qué orden).
- Mantener el estado de cada instancia de saga en ejecución.
- Saber cómo comunicarse con cada servicio participante.
- Gestionar tanto la ejecución exitosa como las compensaciones.
Responsabilidades del Orquestador
graph TD
O[Orchestrator] --> R[Recibir solicitudes]
O --> C[Coordinar pasos]
O --> P[Persistir estado]
O --> E[Emitir eventos]
O --> CO[Compensar fallos]
O --> M[Monitorear progreso]
Modelo de Estado de Saga
El estado de una saga se representa como una máquina de estados finitos. Cada saga está en exactamente un estado en cualquier momento, y solo ciertas transiciones son válidas.
Máquina de Estados
stateDiagram-v2
[*] --> Started
Started --> StepExecuting: execute_step
StepExecuting --> StepCompleted: step_success
StepExecuting --> Compensating: step_failed
StepCompleted --> StepExecuting: next_step
StepCompleted --> Completed: all_steps_done
Compensating --> Compensated: all_compensated
Compensating --> CompensationFailed: compensation_error
CompensationFailed --> Compensating: retry
Completed --> [*]
Compensated --> [*]
Definición de Estado
Separamos dos conceptos importantes:
- SagaDefinition: La plantilla o receta de una saga. Define qué pasos tiene y cómo ejecutarlos. Es estática.
- SagaInstance: Una ejecución específica de una saga para un pedido particular. Es dinámica y tiene estado.
export interface SagaDefinition {
name: string;
steps: SagaStepDefinition[];
}
export interface SagaStepDefinition {
name: string;
service: string;
action: string;
compensationAction: string;
timeout: number;
retryPolicy: RetryPolicy;
}
export interface RetryPolicy {
maxAttempts: number;
backoffMs: number;
maxBackoffMs: number;
}
export interface SagaInstance {
id: string;
definitionName: string;
orderId: string;
status: SagaStatus;
currentStepIndex: number;
context: SagaContext;
stepResults: StepResult[];
startedAt: Date;
completedAt: Date | null;
error: string | null;
}
export type SagaStatus =
| 'started'
| 'step_executing'
| 'step_completed'
| 'completed'
| 'compensating'
| 'compensated'
| 'compensation_failed';
export interface SagaContext {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; unitPrice: number }>;
total: number;
reservationId?: string;
paymentId?: string;
[key: string]: unknown;
}
export interface StepResult {
stepName: string;
status: 'pending' | 'completed' | 'failed' | 'compensated';
executedAt: Date | null;
completedAt: Date | null;
result: unknown;
error: string | null;
}
Arquitectura del Orquestador
packages/saga-orchestrator/
├── src/
│ ├── definition/
│ │ └── order-saga-definition.ts
│ ├── engine/
│ │ ├── saga-engine.ts
│ │ └── step-executor.ts
│ ├── repository/
│ │ └── saga-repository.ts
│ ├── service/
│ │ └── orchestrator-service.ts
│ ├── client/
│ │ └── service-clients.ts
│ ├── compensation/
│ │ ├── compensation-log.ts
│ │ └── compensation-executor.ts
│ ├── api/
│ │ └── routes.ts
│ └── index.ts
└── package.json
Definición de Saga de Orden
packages/saga-orchestrator/src/definition/order-saga-definition.ts
import type { SagaDefinition } from '../types.js';
export const orderSagaDefinition: SagaDefinition = {
name: 'create-order',
steps: [
{
name: 'createOrder',
service: 'order-service',
action: 'create',
compensationAction: 'cancel',
timeout: 5000,
retryPolicy: {
maxAttempts: 3,
backoffMs: 1000,
maxBackoffMs: 5000
}
},
{
name: 'reserveStock',
service: 'inventory-service',
action: 'reserve',
compensationAction: 'release',
timeout: 10000,
retryPolicy: {
maxAttempts: 3,
backoffMs: 500,
maxBackoffMs: 3000
}
},
{
name: 'processPayment',
service: 'payment-service',
action: 'process',
compensationAction: 'refund',
timeout: 30000,
retryPolicy: {
maxAttempts: 2,
backoffMs: 2000,
maxBackoffMs: 10000
}
},
{
name: 'completeOrder',
service: 'order-service',
action: 'complete',
compensationAction: 'none',
timeout: 5000,
retryPolicy: {
maxAttempts: 3,
backoffMs: 1000,
maxBackoffMs: 5000
}
}
]
};
Clientes de Servicio
Los Service Clients encapsulan la comunicación con cada microservicio. Implementan una interfaz común (ServiceClient) que el orquestador usa para ejecutar acciones y compensaciones.
Esta abstracción permite:
- Cambiar la implementación de comunicación (HTTP, gRPC, mensajes) sin afectar al orquestador.
- Agregar lógica transversal (logging, métricas, reintentos) en un solo lugar.
- Facilitar el testing con mocks.
packages/saga-orchestrator/src/client/service-clients.ts
export interface ServiceClient {
execute(action: string, payload: unknown): Promise<unknown>;
compensate(action: string, payload: unknown): Promise<void>;
}
export class OrderServiceClient implements ServiceClient {
constructor(private baseUrl: string) {}
async execute(action: string, payload: unknown): Promise<unknown> {
const p = payload as { customerId: string; items: unknown[]; shippingAddress: unknown };
switch (action) {
case 'create':
return this.createOrder(p);
case 'complete':
return this.completeOrder((payload as { orderId: string }).orderId);
default:
throw new Error(`Unknown action: ${action}`);
}
}
async compensate(action: string, payload: unknown): Promise<void> {
if (action === 'cancel') {
await this.cancelOrder((payload as { orderId: string }).orderId);
}
}
private async createOrder(data: { customerId: string; items: unknown[]; shippingAddress: unknown }) {
const response = await fetch(`${this.baseUrl}/orders`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data)
});
if (!response.ok) throw new Error(`Create order failed: ${response.statusText}`);
return response.json();
}
private async completeOrder(orderId: string) {
const response = await fetch(`${this.baseUrl}/orders/${orderId}/status`, {
method: 'PATCH',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ status: 'completed' })
});
if (!response.ok) throw new Error(`Complete order failed: ${response.statusText}`);
return response.json();
}
private async cancelOrder(orderId: string) {
await fetch(`${this.baseUrl}/orders/${orderId}/cancel`, { method: 'POST' });
}
}
export class InventoryServiceClient implements ServiceClient {
constructor(private baseUrl: string) {}
async execute(action: string, payload: unknown): Promise<unknown> {
if (action === 'reserve') {
const p = payload as { orderId: string; items: Array<{ productId: string; quantity: number }> };
const response = await fetch(`${this.baseUrl}/reservations`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ orderId: p.orderId, items: p.items })
});
if (!response.ok) throw new Error(`Reserve stock failed: ${response.statusText}`);
return response.json();
}
throw new Error(`Unknown action: ${action}`);
}
async compensate(action: string, payload: unknown): Promise<void> {
if (action === 'release') {
const orderId = (payload as { orderId: string }).orderId;
await fetch(`${this.baseUrl}/reservations/${orderId}`, { method: 'DELETE' });
}
}
}
export class PaymentServiceClient implements ServiceClient {
constructor(private baseUrl: string) {}
async execute(action: string, payload: unknown): Promise<unknown> {
if (action === 'process') {
const p = payload as { orderId: string; customerId: string; amount: number };
const response = await fetch(`${this.baseUrl}/payments`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(p)
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.error || 'Payment failed');
}
return response.json();
}
throw new Error(`Unknown action: ${action}`);
}
async compensate(action: string, payload: unknown): Promise<void> {
if (action === 'refund') {
const orderId = (payload as { orderId: string }).orderId;
await fetch(`${this.baseUrl}/payments/${orderId}/refund`, { method: 'POST' });
}
}
}
Repositorio de Sagas
El SagaRepository persiste el estado de cada instancia de saga. Esto es crucial para:
- Recuperación ante fallos: Si el orquestador se cae, al reiniciar puede consultar las sagas pendientes y continuar.
- Visibilidad: Podemos consultar el estado de cualquier saga en cualquier momento.
- Auditoría: Tenemos un registro histórico de todas las sagas ejecutadas.
packages/saga-orchestrator/src/repository/saga-repository.ts
import postgres from 'postgres';
import type { SagaInstance, SagaStatus, StepResult, SagaContext } from '../types.js';
export class SagaRepository {
constructor(private sql: postgres.Sql) {}
async create(saga: SagaInstance): Promise<void> {
await this.sql.begin(async (tx) => {
await tx`
INSERT INTO sagas (id, definition_name, order_id, status, current_step_index, context, started_at)
VALUES (
${saga.id}, ${saga.definitionName}, ${saga.orderId},
${saga.status}, ${saga.currentStepIndex},
${JSON.stringify(saga.context)}, ${saga.startedAt}
)
`;
for (const step of saga.stepResults) {
await tx`
INSERT INTO saga_steps (id, saga_id, step_name, status)
VALUES (${crypto.randomUUID()}, ${saga.id}, ${step.stepName}, ${step.status})
`;
}
});
}
async update(saga: SagaInstance): Promise<void> {
await this.sql.begin(async (tx) => {
await tx`
UPDATE sagas SET
status = ${saga.status},
current_step_index = ${saga.currentStepIndex},
context = ${JSON.stringify(saga.context)},
completed_at = ${saga.completedAt},
error = ${saga.error}
WHERE id = ${saga.id}
`;
for (const step of saga.stepResults) {
await tx`
UPDATE saga_steps SET
status = ${step.status},
executed_at = ${step.executedAt},
completed_at = ${step.completedAt},
result = ${JSON.stringify(step.result)},
error = ${step.error}
WHERE saga_id = ${saga.id} AND step_name = ${step.stepName}
`;
}
});
}
async findById(id: string): Promise<SagaInstance | null> {
const [row] = await this.sql`
SELECT id, definition_name, order_id, status, current_step_index,
context, started_at, completed_at, error
FROM sagas WHERE id = ${id}
`;
if (!row) return null;
const steps = await this.sql`
SELECT step_name, status, executed_at, completed_at, result, error
FROM saga_steps WHERE saga_id = ${id}
ORDER BY executed_at ASC NULLS LAST
`;
return {
id: row.id,
definitionName: row.definition_name,
orderId: row.order_id,
status: row.status as SagaStatus,
currentStepIndex: row.current_step_index,
context: row.context as SagaContext,
stepResults: steps.map(s => ({
stepName: s.step_name,
status: s.status,
executedAt: s.executed_at,
completedAt: s.completed_at,
result: s.result,
error: s.error
})),
startedAt: row.started_at,
completedAt: row.completed_at,
error: row.error
};
}
async findByOrderId(orderId: string): Promise<SagaInstance | null> {
const [row] = await this.sql`
SELECT id FROM sagas WHERE order_id = ${orderId}
ORDER BY started_at DESC LIMIT 1
`;
if (!row) return null;
return this.findById(row.id);
}
async findPendingSagas(): Promise<SagaInstance[]> {
const rows = await this.sql`
SELECT id FROM sagas
WHERE status IN ('started', 'step_executing', 'compensating')
ORDER BY started_at ASC
`;
const sagas: SagaInstance[] = [];
for (const row of rows) {
const saga = await this.findById(row.id);
if (saga) sagas.push(saga);
}
return sagas;
}
}
Diagrama de Flujo Completo
sequenceDiagram
participant C as Client
participant O as Orchestrator
participant R as Repository
participant OS as Order Service
participant IS as Inventory Service
participant PS as Payment Service
C->>O: POST /sagas/order
O->>R: Create saga (started)
O->>OS: Create order
OS-->>O: Order created
O->>R: Update step (completed)
O->>IS: Reserve stock
IS-->>O: Stock reserved
O->>R: Update step (completed)
O->>PS: Process payment
PS-->>O: Payment processed
O->>R: Update step (completed)
O->>OS: Complete order
OS-->>O: Order completed
O->>R: Update saga (completed)
O-->>C: Saga completed
Resumen
- Definición declarativa de pasos con políticas de reintento
- Clientes tipados para cada servicio
- Repositorio para persistir estado de saga
- Máquina de estados clara con transiciones válidas
- Arquitectura preparada para implementación del motor
Glosario
Saga Definition
Definición: Plantilla que describe la estructura de una saga: qué pasos tiene, en qué orden, qué acciones ejecutar, y cómo compensar cada paso.
Por qué es importante: Separa la configuración de la ejecución. Puedes tener múltiples definiciones de sagas (crear pedido, cancelar pedido, actualizar pedido) reutilizando la misma infraestructura.
Ejemplo práctico: orderSagaDefinition define que la saga tiene 4 pasos: createOrder, reserveStock, processPayment, completeOrder. Cada paso especifica su servicio, timeout, y política de reintento.
Saga Instance
Definición: Una ejecución específica de una saga para una transacción particular. Tiene su propio estado, contexto, y progreso.
Por qué es importante: Múltiples pedidos pueden estar procesándose simultáneamente, cada uno como su propia instancia de saga independiente.
Ejemplo práctico: El pedido “123” tiene una instancia de saga con id='abc', status='step_executing', currentStepIndex=2 (procesando pago). El pedido “456” tiene otra instancia diferente.
Saga Context
Definición: Datos que se acumulan durante la ejecución de la saga y se pasan entre pasos. Contiene tanto los datos de entrada como los resultados intermedios.
Por qué es importante: Permite que pasos posteriores usen información generada por pasos anteriores. Por ejemplo, el paso de pago necesita el orderId creado en el paso anterior.
Ejemplo práctico: Inicia con {customerId, items}. Después de crear orden: {customerId, items, orderId}. Después de reservar: {customerId, items, orderId, reservationId}.
Step Result
Definición: Registro del resultado de la ejecución de un paso específico: si tuvo éxito, cuándo se ejecutó, qué resultado devolvió, o qué error ocurrió.
Por qué es importante: Permite rastrear el progreso detallado de la saga y facilita la depuración cuando algo falla.
Ejemplo práctico: {stepName: 'reserveStock', status: 'completed', executedAt: '...', result: {reservationId: 'xyz'}, error: null}.
Service Client
Definición: Componente que encapsula la comunicación con un microservicio específico, exponiendo métodos para ejecutar acciones y compensaciones.
Por qué es importante: Abstrae los detalles de comunicación (HTTP, serialización, manejo de errores) del orquestador. Facilita cambiar la implementación sin afectar la lógica de la saga.
Ejemplo práctico: OrderServiceClient.execute('create', {customerId, items}) internamente hace un POST HTTP a /orders, pero el orquestador no necesita saber eso.
Retry Policy
Definición: Configuración que define cómo reintentar operaciones fallidas: número máximo de intentos, tiempo base de espera, y tiempo máximo de espera.
Por qué es importante: Diferentes pasos pueden necesitar diferentes políticas. El pago puede tolerar reintentos más largos que la reserva de stock.
Ejemplo práctico: {maxAttempts: 3, backoffMs: 1000, maxBackoffMs: 5000} significa: reintentar hasta 3 veces, esperar 1s entre el 1er y 2do intento, 2s entre el 2do y 3ero, sin exceder 5s.
Timeout de Paso
Definición: Tiempo máximo que el orquestador espera por la respuesta de un paso antes de considerarlo fallido.
Por qué es importante: Evita que una saga quede bloqueada indefinidamente esperando un servicio que no responde. Permite detectar fallos y comenzar compensaciones.
Ejemplo práctico: processPayment tiene timeout: 30000 (30 segundos) porque los pagos pueden ser lentos. createOrder tiene timeout: 5000 porque debería ser rápido.
Máquina de Estados Finitos (FSM)
Definición: Modelo matemático donde un sistema puede estar en uno de un número finito de estados, y las transiciones entre estados ocurren en respuesta a eventos.
Por qué es importante: Hace explícito y verificable qué estados son válidos y qué transiciones son permitidas. Previene estados inconsistentes.
Ejemplo práctico: Una saga solo puede pasar de step_completed a step_executing o completed. No puede saltar directamente de started a completed.
Persistencia de Estado
Definición: Guardar el estado de la saga en almacenamiento duradero (base de datos) en cada transición, para poder recuperarlo si el sistema se reinicia.
Por qué es importante: Sin persistencia, si el orquestador se cae, todas las sagas en proceso se perderían. Con persistencia, puede retomar donde quedó.
Ejemplo práctico: Después de cada await repository.update(saga), el estado actual está guardado en PostgreSQL. Si el proceso muere y reinicia, findPendingSagas() encuentra las sagas incompletas.
Definición Declarativa
Definición: Especificar qué queremos lograr en lugar de cómo lograrlo. Describir la estructura de la saga en datos, no en código imperativo.
Por qué es importante: Facilita entender, modificar, y validar la saga. Podrías incluso cargar definiciones de un archivo de configuración o base de datos.
Ejemplo práctico: {name: 'reserveStock', service: 'inventory-service', action: 'reserve', compensationAction: 'release'} describe el paso sin escribir código de cómo ejecutarlo.
← Capítulo 10: Compensaciones | Capítulo 12: Orchestrator - Implementación →