Capítulo 13: Estado de la Saga y Persistencia
Capítulo 13: Estado de la Saga y Persistencia
“Una saga sin estado persistente es una saga que no sobrevive reinicios”
Introduccion
Hasta ahora hemos implementado sagas que funcionan bien mientras el servidor esta activo. Pero en sistemas de produccion, los servidores se reinician, fallan o escalan. Este capitulo aborda como persistir el estado de una saga para garantizar su continuidad y recuperacion ante fallos.
La persistencia de estado significa guardar informacion sobre el progreso de la saga en un almacenamiento duradero (como una base de datos), de modo que si el proceso se interrumpe, podamos retomarlo exactamente donde quedo.
El Problema del Estado
Una saga puede fallar en cualquier momento. Sin persistencia:
1. Saga inicia → Paso 1 OK → Paso 2 OK
2. Servidor se reinicia
3. ¿En qué paso estábamos? ¿Qué compensar?
Modelo de Estado
El modelo de estado define la estructura de datos que captura toda la informacion necesaria para rastrear y recuperar una saga. Incluye identificadores, el paso actual, los pasos completados y cualquier contexto necesario para continuar la ejecucion.
interface SagaState {
id: string;
type: 'CREATE_ORDER';
status: 'running' | 'completed' | 'failed' | 'compensating';
currentStep: number;
context: Record<string, unknown>;
completedSteps: StepResult[];
error?: string;
createdAt: Date;
updatedAt: Date;
}
interface StepResult {
name: string;
status: 'completed' | 'compensated' | 'failed';
result?: unknown;
completedAt: Date;
}
Persistencia con PostgreSQL
PostgreSQL es una base de datos relacional que garantiza durabilidad mediante transacciones ACID. Es ideal para persistir el estado de sagas porque:
- Los datos sobreviven a reinicios del servidor
- Las transacciones garantizan consistencia
- Permite consultas complejas para encontrar sagas pendientes
El Repository Pattern (patron repositorio) encapsula la logica de acceso a datos, separando la logica de negocio de los detalles de persistencia. Cada metodo del repositorio representa una operacion sobre la saga almacenada.
// saga-repository.ts
import { Pool } from 'pg';
export class SagaRepository {
constructor(private pool: Pool) {}
async create(saga: SagaState): Promise<void> {
await this.pool.query(
`INSERT INTO sagas (id, type, status, current_step, context, completed_steps)
VALUES ($1, $2, $3, $4, $5, $6)`,
[saga.id, saga.type, saga.status, saga.currentStep,
JSON.stringify(saga.context), JSON.stringify(saga.completedSteps)]
);
}
async update(id: string, updates: Partial<SagaState>): Promise<void> {
const fields = Object.entries(updates)
.map(([key, _], i) => `${this.toSnakeCase(key)} = $${i + 2}`)
.join(', ');
await this.pool.query(
`UPDATE sagas SET ${fields}, updated_at = NOW() WHERE id = $1`,
[id, ...Object.values(updates).map(v =>
typeof v === 'object' ? JSON.stringify(v) : v
)]
);
}
async findById(id: string): Promise<SagaState | null> {
const result = await this.pool.query(
'SELECT * FROM sagas WHERE id = $1',
[id]
);
return result.rows[0] ? this.mapToSaga(result.rows[0]) : null;
}
async findPending(): Promise<SagaState[]> {
const result = await this.pool.query(
`SELECT * FROM sagas
WHERE status IN ('running', 'compensating')
AND updated_at < NOW() - INTERVAL '5 minutes'`
);
return result.rows.map(this.mapToSaga);
}
private toSnakeCase(str: string): string {
return str.replace(/[A-Z]/g, c => `_${c.toLowerCase()}`);
}
private mapToSaga(row: any): SagaState {
return {
id: row.id,
type: row.type,
status: row.status,
currentStep: row.current_step,
context: row.context,
completedSteps: row.completed_steps,
error: row.error,
createdAt: row.created_at,
updatedAt: row.updated_at
};
}
}
Saga con Estado Persistente
Una Stateful Saga (saga con estado) es una implementacion que guarda su progreso en cada paso. Esto permite:
- Recuperacion: Si el servidor falla, la saga puede continuar desde el ultimo paso completado
- Visibilidad: Podemos consultar el estado actual de cualquier saga
- Auditoria: Mantenemos un historial completo de la ejecucion
El metodo resume() es clave: permite reanudar una saga interrumpida, ya sea continuando su ejecucion normal o completando una compensacion pendiente.
// stateful-saga.ts
export class StatefulSaga<T extends Record<string, unknown>> {
constructor(
private repository: SagaRepository,
private steps: SagaStep<T>[]
) {}
async execute(id: string, input: T): Promise<void> {
const state: SagaState = {
id,
type: 'CREATE_ORDER',
status: 'running',
currentStep: 0,
context: input,
completedSteps: [],
createdAt: new Date(),
updatedAt: new Date()
};
await this.repository.create(state);
try {
await this.runSteps(state);
await this.repository.update(id, { status: 'completed' });
} catch (error) {
await this.compensate(state, error as Error);
}
}
private async runSteps(state: SagaState): Promise<void> {
for (let i = state.currentStep; i < this.steps.length; i++) {
const step = this.steps[i];
const result = await step.execute(state.context as T);
state.completedSteps.push({
name: step.name,
status: 'completed',
result,
completedAt: new Date()
});
await this.repository.update(state.id, {
currentStep: i + 1,
completedSteps: state.completedSteps,
context: state.context
});
}
}
private async compensate(state: SagaState, error: Error): Promise<void> {
await this.repository.update(state.id, {
status: 'compensating',
error: error.message
});
for (let i = state.completedSteps.length - 1; i >= 0; i--) {
const step = this.steps[i];
const stepResult = state.completedSteps[i];
try {
await step.compensate(state.context as T);
stepResult.status = 'compensated';
} catch (compError) {
console.error(`Compensation failed: ${step.name}`, compError);
}
await this.repository.update(state.id, {
completedSteps: state.completedSteps
});
}
await this.repository.update(state.id, { status: 'failed' });
}
async resume(id: string): Promise<void> {
const state = await this.repository.findById(id);
if (!state) throw new Error('Saga not found');
if (state.status === 'compensating') {
await this.compensate(state, new Error('Resumed compensation'));
} else if (state.status === 'running') {
try {
await this.runSteps(state);
await this.repository.update(id, { status: 'completed' });
} catch (error) {
await this.compensate(state, error as Error);
}
}
}
}
Cache con Redis
Redis es una base de datos en memoria que ofrece acceso ultrarapido a los datos. Lo usamos como cache (almacenamiento temporal) para:
- Lectura rapida: Evitar consultas frecuentes a PostgreSQL
- Locks distribuidos: Prevenir que dos procesos modifiquen la misma saga simultaneamente
- TTL (Time To Live): Los datos expiran automaticamente, liberando memoria
Un lock distribuido es un mecanismo que garantiza acceso exclusivo a un recurso en sistemas con multiples servidores. El comando SET ... NX de Redis crea la clave solo si no existe, funcionando como un cerrojo atomico.
// saga-cache.ts
import Redis from 'ioredis';
export class SagaCache {
private redis: Redis;
private ttl = 3600; // 1 hora
constructor(redisUrl: string) {
this.redis = new Redis(redisUrl);
}
async get(sagaId: string): Promise<SagaState | null> {
const data = await this.redis.get(`saga:${sagaId}`);
return data ? JSON.parse(data) : null;
}
async set(saga: SagaState): Promise<void> {
await this.redis.setex(
`saga:${saga.id}`,
this.ttl,
JSON.stringify(saga)
);
}
async acquireLock(sagaId: string, timeout = 30): Promise<boolean> {
const result = await this.redis.set(
`saga:lock:${sagaId}`,
'1',
'EX', timeout,
'NX'
);
return result === 'OK';
}
async releaseLock(sagaId: string): Promise<void> {
await this.redis.del(`saga:lock:${sagaId}`);
}
}
Recovery Worker
Un Recovery Worker (trabajador de recuperacion) es un proceso en segundo plano que periodicamente busca sagas “huerfanas” - aquellas que quedaron en estado running o compensating sin completarse.
Estas sagas huerfanas pueden ocurrir cuando:
- Un servidor falla durante la ejecucion
- Una conexion de red se pierde
- El proceso es terminado abruptamente
El worker usa un intervalo de tiempo (por defecto 60 segundos) para revisar la base de datos y reanudar cualquier saga pendiente.
// recovery-worker.ts
export class SagaRecoveryWorker {
constructor(
private repository: SagaRepository,
private sagaFactory: SagaFactory
) {}
async start(intervalMs = 60000): Promise<void> {
setInterval(() => this.recover(), intervalMs);
}
private async recover(): Promise<void> {
const pendingSagas = await this.repository.findPending();
for (const state of pendingSagas) {
console.log(`Recovering saga: ${state.id}`);
const saga = this.sagaFactory.create(state.type);
await saga.resume(state.id);
}
}
}
Resumen
- Persistir estado permite recuperar sagas tras fallos
- PostgreSQL para almacenamiento duradero
- Redis para cache y locks distribuidos
- Un recovery worker reanuda sagas huerfanas
Glosario
Persistencia de Estado
Definicion: Almacenar informacion sobre el progreso de una saga en un medio duradero que sobrevive a reinicios del sistema.
Por que es importante: Sin persistencia, si un servidor falla durante una saga, se pierde toda la informacion sobre que pasos se completaron, haciendo imposible recuperar o compensar correctamente.
Ejemplo practico: Guardar en PostgreSQL que la saga order-123 completo los pasos “crear orden” y “reservar stock” pero fallo en “procesar pago”.
Repository Pattern
Definicion: Patron de diseno que encapsula la logica de acceso a datos, proporcionando una interfaz limpia para operaciones CRUD sin exponer detalles de la base de datos.
Por que es importante: Separa la logica de negocio de la saga de los detalles de como se almacenan los datos, facilitando cambiar de base de datos o agregar cache sin modificar la saga.
Ejemplo practico: repository.findById(sagaId) devuelve una saga sin importar si viene de PostgreSQL, MongoDB o cache.
Lock Distribuido
Definicion: Mecanismo de sincronizacion que garantiza que solo un proceso puede acceder a un recurso compartido en un sistema con multiples servidores.
Por que es importante: Previene condiciones de carrera donde dos workers intenten procesar la misma saga simultaneamente, lo que podria causar compensaciones duplicadas.
Ejemplo practico: Antes de reanudar una saga, el worker adquiere un lock saga:lock:order-123. Si otro worker ya tiene el lock, espera o se salta esa saga.
TTL (Time To Live)
Definicion: Tiempo de vida de un dato en cache, tras el cual expira y se elimina automaticamente.
Por que es importante: Evita que el cache crezca indefinidamente y garantiza que los datos obsoletos se actualicen eventualmente desde la fuente principal.
Ejemplo practico: El estado de una saga en Redis tiene TTL de 1 hora. Si no se accede en ese tiempo, se elimina y la proxima lectura consultara PostgreSQL.
Recovery Worker
Definicion: Proceso en segundo plano que periodicamente busca y reanuda sagas que quedaron incompletas debido a fallos del sistema.
Por que es importante: Garantiza que ninguna saga quede abandonada indefinidamente, asegurando la consistencia eventual del sistema incluso ante fallos.
Ejemplo practico: Cada 60 segundos, el worker busca sagas con status = 'running' y updated_at < NOW() - 5 minutes, asumiendo que estan huerfanas y necesitan recuperacion.