Capítulo 18: RabbitMQ para Sagas
Capítulo 18: RabbitMQ para Sagas
“RabbitMQ: mensajeria confiable para sagas distribuidas”
Introduccion
Hasta ahora hemos usado Redis como broker. Ahora exploramos RabbitMQ, un message broker empresarial que ofrece caracteristicas avanzadas como ruteo flexible, mensajes persistentes y confirmaciones de entrega.
RabbitMQ implementa el protocolo AMQP (Advanced Message Queuing Protocol), un estandar abierto para mensajeria. Sus caracteristicas principales son:
- Exchanges: Puntos de entrada que rutean mensajes a colas
- Queues: Almacenan mensajes hasta que son consumidos
- Bindings: Reglas que conectan exchanges con queues
Esta arquitectura permite patrones de comunicacion sofisticados, ideales para sagas con multiples servicios.
Arquitectura con RabbitMQ
El diagrama muestra el flujo de eventos. Un exchange tipo topic permite ruteo basado en patrones, donde order.* coincide con order.created, order.cancelled, etc.
graph LR
O[Order Service] --> EX{Exchange}
EX --> Q1[order.created queue]
EX --> Q2[stock.reserved queue]
EX --> Q3[payment.processed queue]
Q1 --> IS[Inventory Service]
Q2 --> PS[Payment Service]
Q3 --> SS[Shipping Service]
Configuracion de Exchanges y Queues
La configuracion inicial crea la topologia de mensajeria:
- Exchange principal:
saga.eventscon tipotopicpara ruteo flexible - Queues por servicio: Cada servicio tiene su cola para recibir eventos relevantes
- Bindings: Conectan el exchange a las queues usando patrones de routing key
- Dead Letter Queue (DLQ): Recibe mensajes que no pudieron procesarse
Las opciones { durable: true } garantizan que exchanges y queues sobrevivan a reinicios del broker.
// rabbitmq/setup.ts
import amqp, { Channel, Connection } from 'amqplib';
export async function setupRabbitMQ(): Promise<Channel> {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Exchange principal para eventos de saga
await channel.assertExchange('saga.events', 'topic', { durable: true });
// Queues por servicio
const queues = [
{ name: 'order.events', pattern: 'order.*' },
{ name: 'inventory.events', pattern: 'inventory.*' },
{ name: 'payment.events', pattern: 'payment.*' },
{ name: 'shipping.events', pattern: 'shipping.*' },
{ name: 'saga.compensations', pattern: '*.failed' }
];
for (const q of queues) {
await channel.assertQueue(q.name, { durable: true });
await channel.bindQueue(q.name, 'saga.events', q.pattern);
}
// Dead Letter Queue para mensajes fallidos
await channel.assertExchange('saga.dlx', 'topic', { durable: true });
await channel.assertQueue('saga.dead-letter', { durable: true });
await channel.bindQueue('saga.dead-letter', 'saga.dlx', '#');
return channel;
}
Event Publisher
El Publisher envia eventos al exchange. Caracteristicas importantes:
- Routing key: Determina a que queues llega el mensaje (ej:
order.created) - persistent: true: El mensaje se escribe a disco, sobreviviendo a reinicios
- correlationId: Permite rastrear mensajes relacionados a traves del sistema
- headers: Metadata adicional como el ID de la saga
El metodo publishCompensation es un helper para publicar eventos de fallo que disparan compensaciones.
// rabbitmq/publisher.ts
import { Channel } from 'amqplib';
export interface SagaEvent {
type: string;
sagaId: string;
orderId?: string;
payload: Record<string, unknown>;
timestamp: Date;
correlationId: string;
}
export class EventPublisher {
constructor(private channel: Channel) {}
async publish(event: SagaEvent): Promise<void> {
const routingKey = event.type.toLowerCase().replace('_', '.');
this.channel.publish(
'saga.events',
routingKey,
Buffer.from(JSON.stringify(event)),
{
persistent: true,
contentType: 'application/json',
correlationId: event.correlationId,
timestamp: Date.now(),
headers: {
sagaId: event.sagaId
}
}
);
}
async publishCompensation(sagaId: string, step: string, reason: string): Promise<void> {
await this.publish({
type: `${step}.FAILED`,
sagaId,
payload: { reason },
timestamp: new Date(),
correlationId: sagaId
});
}
}
Event Consumer Base
El Consumer base proporciona la logica comun para procesar mensajes:
- prefetch(1): Solo recibe un mensaje a la vez, garantizando procesamiento ordenado
- ack(): Confirma al broker que el mensaje fue procesado exitosamente
- reject(msg, false): Rechaza el mensaje sin reencolarlo (va a DLQ)
El manejo de errores implementa reintentos con backoff exponencial: si falla, espera 2^n segundos antes de reintentar. Despues de 3 intentos, el mensaje va a la Dead Letter Queue.
El header x-retry-count rastrea cuantas veces se ha reintentado el mensaje.
// rabbitmq/consumer.ts
import { Channel, ConsumeMessage } from 'amqplib';
import { SagaEvent } from './publisher';
export abstract class EventConsumer {
constructor(
protected channel: Channel,
protected queueName: string
) {}
async start(): Promise<void> {
await this.channel.prefetch(1);
this.channel.consume(this.queueName, async (msg) => {
if (!msg) return;
try {
const event: SagaEvent = JSON.parse(msg.content.toString());
await this.handleEvent(event, msg);
this.channel.ack(msg);
} catch (error) {
await this.handleError(msg, error as Error);
}
});
}
protected abstract handleEvent(event: SagaEvent, msg: ConsumeMessage): Promise<void>;
private async handleError(msg: ConsumeMessage, error: Error): Promise<void> {
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
// Reencolar con delay
setTimeout(() => {
this.channel.publish('', this.queueName, msg.content, {
...msg.properties,
headers: { ...msg.properties.headers, 'x-retry-count': retryCount }
});
}, Math.pow(2, retryCount) * 1000);
this.channel.ack(msg);
} else {
// Enviar a DLQ
this.channel.reject(msg, false);
}
}
}
Inventory Consumer
Cada servicio implementa su propio consumer que hereda de EventConsumer. El metodo handleEvent usa un switch para manejar diferentes tipos de eventos.
Este consumer reacciona a:
ORDER.CREATED: Reserva stock para el nuevo pedidoORDER.CANCELLED/PAYMENT.FAILED: Libera el stock reservado (compensacion)
Despues de procesar, publica el resultado como un nuevo evento, permitiendo que otros servicios reaccionen (patron Event-Driven Architecture).
// services/inventory-consumer.ts
import { EventConsumer } from '../rabbitmq/consumer';
import { EventPublisher, SagaEvent } from '../rabbitmq/publisher';
import { Channel, ConsumeMessage } from 'amqplib';
export class InventoryConsumer extends EventConsumer {
constructor(channel: Channel, private publisher: EventPublisher) {
super(channel, 'inventory.events');
}
protected async handleEvent(event: SagaEvent, msg: ConsumeMessage): Promise<void> {
switch (event.type) {
case 'ORDER.CREATED':
await this.reserveStock(event);
break;
case 'ORDER.CANCELLED':
case 'PAYMENT.FAILED':
await this.releaseStock(event);
break;
}
}
private async reserveStock(event: SagaEvent): Promise<void> {
const { orderId, items } = event.payload as { orderId: string; items: any[] };
try {
// Verificar y reservar stock
const reservationId = await this.inventoryService.reserve(orderId, items);
await this.publisher.publish({
type: 'INVENTORY.RESERVED',
sagaId: event.sagaId,
orderId,
payload: { reservationId },
timestamp: new Date(),
correlationId: event.correlationId
});
} catch (error) {
await this.publisher.publishCompensation(
event.sagaId,
'INVENTORY',
(error as Error).message
);
}
}
private async releaseStock(event: SagaEvent): Promise<void> {
const { reservationId } = event.payload as { reservationId: string };
await this.inventoryService.release(reservationId);
await this.publisher.publish({
type: 'INVENTORY.RELEASED',
sagaId: event.sagaId,
payload: { reservationId },
timestamp: new Date(),
correlationId: event.correlationId
});
}
}
Saga Coordinator
El Saga Coordinator es un servicio especial que escucha eventos de fallo (*.failed) y coordina las compensaciones.
Cuando recibe un evento de fallo:
- Busca el estado de la saga en el repositorio
- Identifica que pasos se completaron
- Publica eventos de compensacion para cada paso en orden inverso
- Actualiza el estado final de la saga
Este patron centraliza la logica de compensacion, aunque tambien podria distribuirse entre los servicios (cada uno escuchando fallos y compensando lo suyo).
// saga/coordinator.ts
import { EventConsumer } from '../rabbitmq/consumer';
import { EventPublisher, SagaEvent } from '../rabbitmq/publisher';
import { Channel, ConsumeMessage } from 'amqplib';
import { SagaRepository, SagaState } from './repository';
export class SagaCoordinator extends EventConsumer {
constructor(
channel: Channel,
private publisher: EventPublisher,
private repository: SagaRepository
) {
super(channel, 'saga.compensations');
}
protected async handleEvent(event: SagaEvent, msg: ConsumeMessage): Promise<void> {
const saga = await this.repository.findById(event.sagaId);
if (!saga) return;
await this.repository.update(saga.id, { status: 'compensating' });
// Determinar qué pasos compensar
const completedSteps = saga.completedSteps;
for (const step of completedSteps.reverse()) {
await this.compensateStep(saga, step);
}
await this.repository.update(saga.id, { status: 'failed' });
}
private async compensateStep(saga: SagaState, step: string): Promise<void> {
const compensationEvents: Record<string, string> = {
'order': 'ORDER.CANCELLED',
'inventory': 'INVENTORY.RELEASE_REQUESTED',
'payment': 'PAYMENT.REFUND_REQUESTED'
};
const eventType = compensationEvents[step];
if (!eventType) return;
await this.publisher.publish({
type: eventType,
sagaId: saga.id,
payload: saga.context,
timestamp: new Date(),
correlationId: saga.id
});
}
}
Iniciar Servicios
// main.ts
import { setupRabbitMQ } from './rabbitmq/setup';
import { EventPublisher } from './rabbitmq/publisher';
import { InventoryConsumer } from './services/inventory-consumer';
import { PaymentConsumer } from './services/payment-consumer';
import { SagaCoordinator } from './saga/coordinator';
async function main() {
const channel = await setupRabbitMQ();
const publisher = new EventPublisher(channel);
// Iniciar consumers
const inventoryConsumer = new InventoryConsumer(channel, publisher);
const paymentConsumer = new PaymentConsumer(channel, publisher);
const coordinator = new SagaCoordinator(channel, publisher, repository);
await Promise.all([
inventoryConsumer.start(),
paymentConsumer.start(),
coordinator.start()
]);
console.log('All consumers started');
}
main().catch(console.error);
Resumen
- Exchanges topic para ruteo flexible de eventos
- Queues durables garantizan persistencia
- Dead Letter Queue para mensajes fallidos
- Prefetch 1 para procesamiento ordenado
- Coordinator centraliza logica de compensacion
Glosario
RabbitMQ
Definicion: Message broker open-source que implementa el protocolo AMQP, proporcionando colas de mensajes confiables con ruteo flexible y confirmaciones de entrega.
Por que es importante: Ofrece garantias de entrega mas fuertes que Redis, con persistencia, clustering y features empresariales para sistemas de mision critica.
Ejemplo practico: Un mensaje de pago se persiste en disco. Aunque RabbitMQ se reinicie, el mensaje no se pierde y se entrega al consumer cuando vuelve a estar disponible.
Exchange
Definicion: Componente de RabbitMQ que recibe mensajes de productores y los rutea a una o mas colas segun reglas de binding y routing keys.
Por que es importante: Desacopla productores de consumidores. El productor no necesita saber que colas existen; solo envia al exchange con una routing key.
Ejemplo practico: El exchange saga.events con tipo topic recibe un mensaje con routing key order.created y lo envia a todas las colas que tienen binding para order.*.
Queue (RabbitMQ)
Definicion: Buffer que almacena mensajes hasta que un consumidor los procesa. Las colas pueden ser durables (sobreviven reinicios) o transitorias.
Por que es importante: Absorben picos de carga y garantizan que los mensajes no se pierdan si los consumidores estan temporalmente no disponibles.
Ejemplo practico: La cola payment.events tiene 100 mensajes pendientes. El consumer procesa 10 por segundo, vaciando la cola en 10 segundos sin perder ninguno.
Binding
Definicion: Regla que conecta un exchange con una cola, especificando que mensajes (basado en routing key o headers) deben llegar a esa cola.
Por que es importante: Permite filtrar mensajes para que cada servicio solo reciba los eventos que le interesan, reduciendo procesamiento innecesario.
Ejemplo practico: El binding inventory.events <- inventory.* hace que solo eventos como inventory.reserved o inventory.released lleguen al servicio de inventario.
Dead Letter Queue (DLQ)
Definicion: Cola especial que recibe mensajes que no pudieron procesarse despues de multiples intentos, permitiendo analisis posterior sin bloquear el flujo principal.
Por que es importante: Evita que mensajes problematicos bloqueen indefinidamente a los consumers, mientras preserva los datos para debugging y reprocesamiento manual.
Ejemplo practico: Un mensaje con datos corruptos falla 3 veces y va a la DLQ. Un operador revisa la DLQ, corrige el problema y reencola el mensaje.
Message Acknowledgement (ack)
Definicion: Confirmacion que un consumer envia al broker indicando que un mensaje fue procesado exitosamente y puede eliminarse de la cola.
Por que es importante: Garantiza entrega “at-least-once”: si el consumer falla antes de hacer ack, el mensaje se reentrega a otro consumer.
Ejemplo practico: El consumer procesa un pago, actualiza la DB, y luego llama channel.ack(msg). Solo entonces RabbitMQ elimina el mensaje de la cola.
Prefetch Count
Definicion: Numero maximo de mensajes no confirmados que RabbitMQ enviara a un consumer antes de esperar acknowledgements.
Por que es importante: Controla la carga en el consumer y la distribucion de trabajo. prefetch(1) garantiza procesamiento secuencial; valores mayores mejoran throughput.
Ejemplo practico: Con prefetch(10), un consumer rapido recibe 10 mensajes mientras uno lento solo tiene 1, distribuyendo trabajo de forma mas justa.
Routing Key
Definicion: String asociado a cada mensaje que los exchanges usan para decidir a que colas rutear el mensaje.
Por que es importante: Permite implementar patrones de ruteo sofisticados como pub/sub selectivo o RPC sin modificar productores o consumidores.
Ejemplo practico: order.created, order.paid, order.shipped son routing keys que permiten a diferentes servicios suscribirse solo a eventos relevantes.