Capítulo 10: Implementación de Compensaciones
Capítulo 10: Implementación de Compensaciones
“El arte de deshacer con gracia”
Introducción
Este capítulo implementa el sistema de compensaciones del orquestador. Hasta ahora, cada servicio sabe cómo revertir sus operaciones individualmente. Ahora veremos cómo el orquestador coordina todas las compensaciones cuando una saga falla.
Los componentes clave son:
- CompensationLog: Registro persistente de compensaciones pendientes.
- CompensationExecutor: Motor que ejecuta los handlers de compensación.
- Handlers específicos: Implementaciones concretas para cada tipo de compensación.
- Worker: Proceso que reintenta compensaciones fallidas.
Flujo de Compensación
sequenceDiagram
participant O as Orchestrator
participant OS as Order Service
participant IS as Inventory Service
participant PS as Payment Service
Note over O,PS: Escenario: Pago falla después de reservar stock
O->>OS: Create Order
OS-->>O: Order Created
O->>IS: Reserve Stock
IS-->>O: Stock Reserved
O->>PS: Process Payment
PS-->>O: Payment Failed
Note over O,PS: Iniciar compensación
O->>IS: Release Stock (compensar)
IS-->>O: Stock Released
O->>OS: Cancel Order (compensar)
OS-->>O: Order Cancelled
Registro de Compensaciones
El CompensationLog es una tabla de base de datos que registra cada compensación que debe ejecutarse. Esto garantiza que incluso si el orquestador se cae a mitad de una compensación, al reiniciar puede ver qué compensaciones quedaron pendientes.
Este es un ejemplo del patrón Outbox: en lugar de ejecutar directamente la compensación, la registramos primero para no perderla.
packages/saga-orchestrator/src/compensation/compensation-log.ts
import postgres from 'postgres';
export type CompensationStatus = 'pending' | 'in_progress' | 'completed' | 'failed';
export interface CompensationRecord {
id: string;
sagaId: string;
stepName: string;
payload: Record<string, unknown>;
status: CompensationStatus;
attempts: number;
lastError: string | null;
createdAt: Date;
completedAt: Date | null;
}
export class CompensationLog {
constructor(private sql: postgres.Sql) {}
async record(sagaId: string, stepName: string, payload: Record<string, unknown>): Promise<string> {
const id = crypto.randomUUID();
await this.sql`
INSERT INTO compensation_log (id, saga_id, step_name, payload, status, attempts, created_at)
VALUES (${id}, ${sagaId}, ${stepName}, ${JSON.stringify(payload)}, 'pending', 0, NOW())
`;
return id;
}
async markInProgress(id: string): Promise<void> {
await this.sql`
UPDATE compensation_log
SET status = 'in_progress', attempts = attempts + 1
WHERE id = ${id}
`;
}
async markCompleted(id: string): Promise<void> {
await this.sql`
UPDATE compensation_log
SET status = 'completed', completed_at = NOW()
WHERE id = ${id}
`;
}
async markFailed(id: string, error: string): Promise<void> {
await this.sql`
UPDATE compensation_log
SET status = 'failed', last_error = ${error}
WHERE id = ${id}
`;
}
async getPending(limit: number = 10): Promise<CompensationRecord[]> {
return this.sql`
SELECT id, saga_id, step_name, payload, status, attempts, last_error, created_at, completed_at
FROM compensation_log
WHERE status IN ('pending', 'failed')
AND attempts < 5
ORDER BY created_at ASC
LIMIT ${limit}
`;
}
async getBySaga(sagaId: string): Promise<CompensationRecord[]> {
return this.sql`
SELECT id, saga_id, step_name, payload, status, attempts, last_error, created_at, completed_at
FROM compensation_log
WHERE saga_id = ${sagaId}
ORDER BY created_at DESC
`;
}
}
Executor de Compensaciones
El CompensationExecutor es responsable de ejecutar las compensaciones registradas. Funciona con un sistema de plugins: cada tipo de paso tiene un handler registrado que sabe cómo ejecutar su compensación.
El patrón Strategy permite agregar nuevos tipos de compensación sin modificar el executor.
packages/saga-orchestrator/src/compensation/compensation-executor.ts
import { CompensationLog, type CompensationRecord } from './compensation-log.js';
export interface CompensationHandler {
stepName: string;
execute(payload: Record<string, unknown>): Promise<void>;
}
export class CompensationExecutor {
private handlers = new Map<string, CompensationHandler>();
constructor(private log: CompensationLog) {}
registerHandler(handler: CompensationHandler): void {
this.handlers.set(handler.stepName, handler);
}
async executeCompensation(record: CompensationRecord): Promise<void> {
const handler = this.handlers.get(record.stepName);
if (!handler) {
throw new Error(`No handler for compensation step: ${record.stepName}`);
}
await this.log.markInProgress(record.id);
try {
await handler.execute(record.payload);
await this.log.markCompleted(record.id);
} catch (error) {
await this.log.markFailed(record.id, (error as Error).message);
throw error;
}
}
async executePendingCompensations(): Promise<{ processed: number; failed: number }> {
const pending = await this.log.getPending();
let processed = 0;
let failed = 0;
for (const record of pending) {
try {
await this.executeCompensation(record);
processed++;
} catch {
failed++;
}
}
return { processed, failed };
}
}
Handlers de Compensación
Los handlers son implementaciones específicas para cada tipo de compensación. Cada handler sabe cómo comunicarse con su servicio correspondiente para ejecutar la compensación.
Note que todos los handlers manejan el caso 404 (no encontrado) como éxito, ya que significa que el recurso ya no existe, lo cual es el estado deseado después de la compensación.
packages/saga-orchestrator/src/compensation/handlers/release-stock-handler.ts
import type { CompensationHandler } from '../compensation-executor.js';
export class ReleaseStockHandler implements CompensationHandler {
stepName = 'reserveStock';
constructor(private inventoryServiceUrl: string) {}
async execute(payload: Record<string, unknown>): Promise<void> {
const orderId = payload.orderId as string;
const response = await fetch(`${this.inventoryServiceUrl}/reservations/${orderId}`, {
method: 'DELETE'
});
if (!response.ok && response.status !== 404) {
throw new Error(`Failed to release stock: ${response.statusText}`);
}
}
}
packages/saga-orchestrator/src/compensation/handlers/refund-payment-handler.ts
import type { CompensationHandler } from '../compensation-executor.js';
export class RefundPaymentHandler implements CompensationHandler {
stepName = 'processPayment';
constructor(private paymentServiceUrl: string) {}
async execute(payload: Record<string, unknown>): Promise<void> {
const orderId = payload.orderId as string;
const response = await fetch(`${this.paymentServiceUrl}/payments/${orderId}/refund`, {
method: 'POST'
});
if (!response.ok && response.status !== 404) {
throw new Error(`Failed to refund payment: ${response.statusText}`);
}
}
}
packages/saga-orchestrator/src/compensation/handlers/cancel-order-handler.ts
import type { CompensationHandler } from '../compensation-executor.js';
export class CancelOrderHandler implements CompensationHandler {
stepName = 'createOrder';
constructor(private orderServiceUrl: string) {}
async execute(payload: Record<string, unknown>): Promise<void> {
const orderId = payload.orderId as string;
const response = await fetch(`${this.orderServiceUrl}/orders/${orderId}/cancel`, {
method: 'POST'
});
if (!response.ok && response.status !== 404) {
throw new Error(`Failed to cancel order: ${response.statusText}`);
}
}
}
Saga con Compensación Integrada
La clase CompensableSaga integra la ejecución de pasos con el registro automático de compensaciones. Cada vez que un paso se completa exitosamente, su compensación se registra inmediatamente.
El método compensate() ejecuta las compensaciones en orden inverso (LIFO), que es el orden correcto para deshacer operaciones que pueden tener dependencias.
packages/saga-orchestrator/src/saga/compensable-saga.ts
import { CompensationLog } from '../compensation/compensation-log.js';
import { CompensationExecutor } from '../compensation/compensation-executor.js';
interface SagaStep<TContext> {
name: string;
execute(context: TContext): Promise<void>;
getCompensationPayload(context: TContext): Record<string, unknown>;
}
export class CompensableSaga<TContext extends { orderId: string }> {
private steps: SagaStep<TContext>[] = [];
private completedSteps: string[] = [];
constructor(
private sagaId: string,
private compensationLog: CompensationLog,
private compensationExecutor: CompensationExecutor
) {}
addStep(step: SagaStep<TContext>): this {
this.steps.push(step);
return this;
}
async execute(context: TContext): Promise<void> {
for (const step of this.steps) {
try {
await step.execute(context);
this.completedSteps.push(step.name);
// Registrar compensación pendiente
await this.compensationLog.record(
this.sagaId,
step.name,
step.getCompensationPayload(context)
);
} catch (error) {
console.error(`Step ${step.name} failed:`, error);
await this.compensate(context);
throw error;
}
}
// Saga completada, limpiar compensaciones pendientes
await this.clearCompensations();
}
private async compensate(context: TContext): Promise<void> {
const compensations = await this.compensationLog.getBySaga(this.sagaId);
// Ordenar en reversa (LIFO)
const pendingCompensations = compensations
.filter(c => c.status === 'pending')
.reverse();
for (const compensation of pendingCompensations) {
try {
await this.compensationExecutor.executeCompensation(compensation);
} catch (error) {
console.error(`Compensation ${compensation.stepName} failed:`, error);
// Continuar con las demás compensaciones
}
}
}
private async clearCompensations(): Promise<void> {
const compensations = await this.compensationLog.getBySaga(this.sagaId);
for (const comp of compensations) {
if (comp.status === 'pending') {
await this.compensationLog.markCompleted(comp.id);
}
}
}
}
Worker de Compensaciones Fallidas
El CompensationWorker es un proceso en segundo plano que se ejecuta periódicamente buscando compensaciones que fallaron y necesitan reintentarse.
Este patrón se conoce como Background Job o Scheduled Task. Garantiza que eventualmente todas las compensaciones se completen, incluso si fallaron inicialmente por problemas temporales.
packages/saga-orchestrator/src/workers/compensation-worker.ts
import { CompensationExecutor } from '../compensation/compensation-executor.js';
export class CompensationWorker {
private running = false;
private intervalId: ReturnType<typeof setInterval> | null = null;
constructor(
private executor: CompensationExecutor,
private intervalMs: number = 30000
) {}
start(): void {
if (this.running) return;
this.running = true;
this.intervalId = setInterval(() => this.process(), this.intervalMs);
console.log('Compensation worker started');
}
stop(): void {
if (!this.running) return;
this.running = false;
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
console.log('Compensation worker stopped');
}
private async process(): Promise<void> {
try {
const result = await this.executor.executePendingCompensations();
if (result.processed > 0 || result.failed > 0) {
console.log(`Compensations: ${result.processed} processed, ${result.failed} failed`);
}
} catch (error) {
console.error('Compensation worker error:', error);
}
}
}
Schema de Base de Datos
CREATE TABLE IF NOT EXISTS compensation_log (
id UUID PRIMARY KEY,
saga_id UUID NOT NULL,
step_name VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
);
CREATE INDEX idx_compensation_status ON compensation_log(status);
CREATE INDEX idx_compensation_saga ON compensation_log(saga_id);
Uso Integrado
// Configurar executor con handlers
const compensationLog = new CompensationLog(sql);
const compensationExecutor = new CompensationExecutor(compensationLog);
compensationExecutor.registerHandler(new ReleaseStockHandler(INVENTORY_SERVICE_URL));
compensationExecutor.registerHandler(new RefundPaymentHandler(PAYMENT_SERVICE_URL));
compensationExecutor.registerHandler(new CancelOrderHandler(ORDER_SERVICE_URL));
// Iniciar worker de reintento
const compensationWorker = new CompensationWorker(compensationExecutor);
compensationWorker.start();
// Crear saga con compensación
const saga = new CompensableSaga(sagaId, compensationLog, compensationExecutor);
saga
.addStep({
name: 'createOrder',
execute: async (ctx) => { /* ... */ },
getCompensationPayload: (ctx) => ({ orderId: ctx.orderId })
})
.addStep({
name: 'reserveStock',
execute: async (ctx) => { /* ... */ },
getCompensationPayload: (ctx) => ({ orderId: ctx.orderId })
})
.addStep({
name: 'processPayment',
execute: async (ctx) => { /* ... */ },
getCompensationPayload: (ctx) => ({ orderId: ctx.orderId })
});
await saga.execute(context);
Resumen
- CompensationLog registra compensaciones pendientes
- CompensationExecutor ejecuta handlers de compensación
- Handlers específicos para cada tipo de compensación
- Worker de reintento procesa compensaciones fallidas
- Compensaciones idempotentes y tolerantes a fallos
Glosario
Compensation Log
Definición: Tabla de base de datos que registra cada compensación que debe ejecutarse, incluyendo su estado, intentos, y errores.
Por qué es importante: Garantiza durabilidad de las compensaciones. Si el orquestador se cae, al reiniciar puede consultar qué compensaciones quedaron pendientes y continuarlas.
Ejemplo práctico: Al ejecutar exitosamente reserveStock, se inserta un registro en compensation_log con step_name='reserveStock', status='pending', y payload={orderId: '123'}.
Compensation Handler
Definición: Componente que implementa la lógica específica de compensación para un tipo de paso de saga.
Por qué es importante: Separa la lógica de compensación del executor. Cada handler conoce los detalles de su servicio y cómo comunicarse con él para compensar.
Ejemplo práctico: ReleaseStockHandler sabe que para compensar reserveStock debe llamar a DELETE /reservations/:orderId en el Inventory Service.
Strategy Pattern
Definición: Patrón de diseño donde un algoritmo se encapsula en una clase separada, permitiendo intercambiar implementaciones sin cambiar el código que las usa.
Por qué es importante: El CompensationExecutor puede manejar cualquier tipo de compensación sin conocer los detalles. Solo necesita un handler registrado para cada tipo.
Ejemplo práctico: executor.registerHandler(new ReleaseStockHandler(...)) registra una estrategia. El executor llama a handler.execute(payload) sin saber qué hace internamente.
Background Worker
Definición: Proceso que se ejecuta de forma independiente y periódica, procesando tareas que no requieren respuesta inmediata.
Por qué es importante: Permite procesar trabajo asíncrono sin bloquear las solicitudes principales. Los fallos temporales se manejan con reintentos en segundo plano.
Ejemplo práctico: El CompensationWorker cada 30 segundos consulta compensaciones fallidas con menos de 5 intentos y las reintenta, sin afectar el flujo principal de la aplicación.
Scheduled Task
Definición: Tarea programada para ejecutarse a intervalos regulares o en momentos específicos.
Por qué es importante: Automatiza procesos de mantenimiento, limpieza, y reintentos sin intervención manual.
Ejemplo práctico: setInterval(() => this.process(), 30000) ejecuta el procesamiento de compensaciones cada 30 segundos automáticamente.
LIFO (Last In, First Out)
Definición: Estructura de datos o procesamiento donde el último elemento en entrar es el primero en salir. Como una pila de platos.
Por qué es importante: Las compensaciones deben ejecutarse en orden inverso a las operaciones originales. El último paso ejecutado debe ser el primero en compensarse.
Ejemplo práctico: Si ejecutamos T1->T2->T3 y T3 falla, compensamos en orden C2->C1 (no C1->C2). Usamos completedSteps.reverse() para lograr LIFO.
Payload de Compensación
Definición: Datos necesarios para ejecutar una compensación específica, guardados cuando se registra la compensación.
Por qué es importante: Contiene la información que el handler necesita para ejecutar la compensación. Sin estos datos, no sabríamos qué compensar.
Ejemplo práctico: Para compensar reserveStock, necesitamos orderId para llamar a DELETE /reservations/:orderId. Este dato se guarda en el campo payload del log.
Estado de Compensación
Definición: Estado actual de una compensación en su ciclo de vida: pending, in_progress, completed, failed.
Por qué es importante: Permite rastrear qué compensaciones necesitan atención, cuáles están en proceso, y cuáles se completaron.
Ejemplo práctico: Una compensación inicia como pending. Al intentar ejecutarla, pasa a in_progress. Si tiene éxito, pasa a completed. Si falla, vuelve a failed para reintento posterior.
Límite de Reintentos
Definición: Número máximo de veces que se intentará ejecutar una compensación fallida antes de considerarla definitivamente fallida.
Por qué es importante: Evita reintentos infinitos de compensaciones que nunca van a tener éxito. Después del límite, se requiere intervención manual.
Ejemplo práctico: WHERE attempts < 5 en la consulta del worker significa que después de 5 intentos fallidos, la compensación ya no se reintentará automáticamente.
Tolerancia a Fallos (Graceful Degradation)
Definición: Capacidad del sistema para continuar funcionando (posiblemente con funcionalidad reducida) cuando algunos componentes fallan.
Por qué es importante: El sistema de compensaciones continúa procesando otras compensaciones aunque una falle. No se detiene todo porque un servicio esté caído.
Ejemplo práctico: Si la compensación de stock falla, el worker registra el fallo y continúa con la siguiente compensación en la cola. El stock fallido se reintentará después.
← Capítulo 9: Payment Service | Capítulo 11: Orchestrator - Diseño →