Capítulo 19: Apache Kafka para Sagas
Capítulo 19: Apache Kafka para Sagas
“Kafka: event streaming para sagas de alta escala”
Introduccion
Apache Kafka es diferente a RabbitMQ. Mientras RabbitMQ es un message broker tradicional (los mensajes se eliminan despues de consumirse), Kafka es una plataforma de event streaming donde los eventos se persisten como un log inmutable.
Diferencias clave con RabbitMQ:
- Persistencia: Kafka retiene eventos por tiempo configurable (dias, semanas, siempre)
- Replay: Los consumidores pueden releer eventos pasados
- Particiones: Permiten paralelismo masivo manteniendo orden por clave
- Consumer Groups: Permiten escalar consumidores horizontalmente
Kafka es ideal para sistemas de alta escala donde millones de eventos por segundo son comunes.
Arquitectura con Kafka
El diagrama muestra la estructura: topics contienen mensajes, divididos en particiones. Los consumer groups permiten que multiples instancias de un servicio procesen en paralelo.
graph TD
P[Producer] --> T1[order-events topic]
P --> T2[inventory-events topic]
P --> T3[payment-events topic]
T1 --> CG1[Order Consumer Group]
T2 --> CG2[Inventory Consumer Group]
T3 --> CG3[Payment Consumer Group]
CG1 --> S1[Order Service]
CG2 --> S2[Inventory Service]
CG3 --> S3[Payment Service]
Configuracion de Topics
Un topic es un canal de eventos con nombre. Cada topic tiene:
- Particiones: Subdivisiones que permiten paralelismo. Los mensajes con la misma clave van a la misma particion
- Factor de replicacion: Cuantas copias de cada particion existen para alta disponibilidad
Usamos diferentes topics para separar tipos de eventos:
saga-events: Eventos de dominio (orden creada, pago procesado)saga-commands: Comandos para servicios (crear orden, procesar pago)saga-state: Cambios de estado de sagassaga-dlq: Eventos fallidos (Dead Letter Queue)
// kafka/setup.ts
import { Kafka, Admin } from 'kafkajs';
const kafka = new Kafka({
clientId: 'orderflow',
brokers: ['localhost:9092']
});
export async function setupTopics(): Promise<void> {
const admin = kafka.admin();
await admin.connect();
const topics = [
{ topic: 'saga-events', numPartitions: 6, replicationFactor: 1 },
{ topic: 'saga-commands', numPartitions: 6, replicationFactor: 1 },
{ topic: 'saga-state', numPartitions: 3, replicationFactor: 1 },
{ topic: 'saga-dlq', numPartitions: 1, replicationFactor: 1 }
];
await admin.createTopics({ topics });
await admin.disconnect();
}
export { kafka };
Producer de Eventos
El Producer envia mensajes a topics de Kafka. Configuraciones importantes:
- idempotent: true: Garantiza que cada mensaje se escriba exactamente una vez, incluso con reintentos
- key: Los mensajes con la misma clave van a la misma particion, garantizando orden
- headers: Metadata adicional como tipo de evento y ID de saga
Usamos el sagaId como clave para que todos los eventos de una saga vayan a la misma particion y se procesen en orden.
// kafka/producer.ts
import { kafka } from './setup';
import { Producer, Message } from 'kafkajs';
export interface SagaEvent {
type: string;
sagaId: string;
orderId?: string;
payload: Record<string, unknown>;
timestamp: number;
version: number;
}
export class SagaEventProducer {
private producer: Producer;
constructor() {
this.producer = kafka.producer({
idempotent: true,
maxInFlightRequests: 5
});
}
async connect(): Promise<void> {
await this.producer.connect();
}
async publish(event: SagaEvent): Promise<void> {
const message: Message = {
key: event.sagaId,
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'saga-id': event.sagaId
}
};
await this.producer.send({
topic: 'saga-events',
messages: [message]
});
}
async publishBatch(events: SagaEvent[]): Promise<void> {
const messages = events.map(event => ({
key: event.sagaId,
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'saga-id': event.sagaId
}
}));
await this.producer.send({
topic: 'saga-events',
messages
});
}
async disconnect(): Promise<void> {
await this.producer.disconnect();
}
}
Consumer Base
El Consumer de Kafka es diferente a RabbitMQ:
- Consumer Group: Identificador que agrupa consumidores. Kafka distribuye particiones entre miembros del grupo
- Session Timeout: Si un consumer no envia heartbeats en este tiempo, se considera muerto y sus particiones se reasignan
- fromBeginning: Si es true, lee desde el inicio del topic; si es false, solo eventos nuevos
El flujo: suscribirse a topics, ejecutar el loop de consumo, procesar cada mensaje, manejar errores enviando a DLQ.
// kafka/consumer.ts
import { kafka } from './setup';
import { Consumer, EachMessagePayload } from 'kafkajs';
import { SagaEvent } from './producer';
export abstract class SagaEventConsumer {
protected consumer: Consumer;
constructor(groupId: string) {
this.consumer = kafka.consumer({
groupId,
sessionTimeout: 30000,
heartbeatInterval: 3000
});
}
async start(topics: string[]): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topics, fromBeginning: false });
await this.consumer.run({
eachMessage: async (payload) => {
await this.handleMessage(payload);
}
});
}
private async handleMessage(payload: EachMessagePayload): Promise<void> {
const { topic, partition, message } = payload;
if (!message.value) return;
const event: SagaEvent = JSON.parse(message.value.toString());
try {
await this.processEvent(event);
} catch (error) {
await this.handleError(event, error as Error);
}
}
protected abstract processEvent(event: SagaEvent): Promise<void>;
protected async handleError(event: SagaEvent, error: Error): Promise<void> {
// Publicar a DLQ
const dlqProducer = kafka.producer();
await dlqProducer.connect();
await dlqProducer.send({
topic: 'saga-dlq',
messages: [{
key: event.sagaId,
value: JSON.stringify({ event, error: error.message })
}]
});
await dlqProducer.disconnect();
}
async stop(): Promise<void> {
await this.consumer.disconnect();
}
}
Implementacion en Go
La implementacion en Go usa la biblioteca kafka-go de Segment. El patron es similar:
- Reader: Consume mensajes de un topic
- Handlers Map: Mapea tipos de evento a funciones que los procesan
- MinBytes/MaxBytes: Controlan batching para eficiencia
Go es popular para consumidores de Kafka porque su modelo de concurrencia (goroutines) maneja eficientemente muchas conexiones simultaneas.
// kafka/consumer.go
package kafka
import (
"context"
"encoding/json"
"log"
"github.com/segmentio/kafka-go"
)
type SagaEvent struct {
Type string `json:"type"`
SagaID string `json:"sagaId"`
OrderID string `json:"orderId,omitempty"`
Payload map[string]interface{} `json:"payload"`
Timestamp int64 `json:"timestamp"`
Version int `json:"version"`
}
type EventHandler func(event SagaEvent) error
type SagaConsumer struct {
reader *kafka.Reader
handlers map[string]EventHandler
}
func NewSagaConsumer(brokers []string, groupID, topic string) *SagaConsumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 10e3,
MaxBytes: 10e6,
})
return &SagaConsumer{
reader: reader,
handlers: make(map[string]EventHandler),
}
}
func (c *SagaConsumer) RegisterHandler(eventType string, handler EventHandler) {
c.handlers[eventType] = handler
}
func (c *SagaConsumer) Start(ctx context.Context) error {
for {
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return nil
}
log.Printf("Error reading message: %v", err)
continue
}
var event SagaEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("Error unmarshaling event: %v", err)
continue
}
handler, exists := c.handlers[event.Type]
if !exists {
log.Printf("No handler for event type: %s", event.Type)
continue
}
if err := handler(event); err != nil {
log.Printf("Error handling event: %v", err)
// Enviar a DLQ
}
}
}
func (c *SagaConsumer) Close() error {
return c.reader.Close()
}
Saga Orchestrator con Kafka Streams
El orquestador usa un State Store (almacen de estado) en memoria para rastrear sagas activas. En produccion, esto seria un KTable o una base de datos externa.
El flujo del orquestador:
- Recibe eventos
*_COMPLETEDo*_FAILED - Busca el estado de la saga correspondiente
- Si completado: avanza al siguiente paso o finaliza
- Si fallido: inicia compensaciones en orden inverso
La secuencia de pasos (stepSequence) define tanto la operacion como su compensacion correspondiente.
// saga/kafka-orchestrator.ts
import { SagaEventConsumer } from '../kafka/consumer';
import { SagaEventProducer, SagaEvent } from '../kafka/producer';
interface SagaState {
sagaId: string;
status: 'running' | 'completed' | 'compensating' | 'failed';
currentStep: number;
completedSteps: string[];
context: Record<string, unknown>;
}
export class KafkaSagaOrchestrator extends SagaEventConsumer {
private producer: SagaEventProducer;
private stateStore: Map<string, SagaState> = new Map();
private stepSequence = [
{ name: 'CREATE_ORDER', compensate: 'CANCEL_ORDER' },
{ name: 'RESERVE_STOCK', compensate: 'RELEASE_STOCK' },
{ name: 'PROCESS_PAYMENT', compensate: 'REFUND_PAYMENT' },
{ name: 'SCHEDULE_SHIPPING', compensate: 'CANCEL_SHIPPING' }
];
constructor() {
super('saga-orchestrator');
this.producer = new SagaEventProducer();
}
async initialize(): Promise<void> {
await this.producer.connect();
await this.start(['saga-events']);
}
protected async processEvent(event: SagaEvent): Promise<void> {
const state = this.stateStore.get(event.sagaId);
if (!state) return;
if (event.type.endsWith('_COMPLETED')) {
await this.handleStepCompleted(state, event);
} else if (event.type.endsWith('_FAILED')) {
await this.handleStepFailed(state, event);
}
}
private async handleStepCompleted(state: SagaState, event: SagaEvent): Promise<void> {
const stepName = event.type.replace('_COMPLETED', '');
state.completedSteps.push(stepName);
state.currentStep++;
state.context = { ...state.context, ...event.payload };
if (state.currentStep >= this.stepSequence.length) {
state.status = 'completed';
await this.producer.publish({
type: 'SAGA_COMPLETED',
sagaId: state.sagaId,
payload: state.context,
timestamp: Date.now(),
version: 1
});
} else {
const nextStep = this.stepSequence[state.currentStep];
await this.producer.publish({
type: `${nextStep.name}_COMMAND`,
sagaId: state.sagaId,
payload: state.context,
timestamp: Date.now(),
version: 1
});
}
this.stateStore.set(state.sagaId, state);
}
private async handleStepFailed(state: SagaState, event: SagaEvent): Promise<void> {
state.status = 'compensating';
for (let i = state.completedSteps.length - 1; i >= 0; i--) {
const step = this.stepSequence.find(s => s.name === state.completedSteps[i]);
if (step) {
await this.producer.publish({
type: `${step.compensate}_COMMAND`,
sagaId: state.sagaId,
payload: state.context,
timestamp: Date.now(),
version: 1
});
}
}
state.status = 'failed';
this.stateStore.set(state.sagaId, state);
}
async startSaga(sagaId: string, initialData: Record<string, unknown>): Promise<void> {
const state: SagaState = {
sagaId,
status: 'running',
currentStep: 0,
completedSteps: [],
context: initialData
};
this.stateStore.set(sagaId, state);
await this.producer.publish({
type: 'CREATE_ORDER_COMMAND',
sagaId,
payload: initialData,
timestamp: Date.now(),
version: 1
});
}
}
Resumen
- Kafka para alto throughput y durabilidad
- Consumer Groups para escalado horizontal
- Particiones por sagaId garantizan orden
- DLQ para eventos fallidos
- Idempotencia evita duplicados
Glosario
Apache Kafka
Definicion: Plataforma distribuida de event streaming que persiste eventos como un log inmutable, permitiendo procesamiento en tiempo real y replay de datos historicos.
Por que es importante: Maneja millones de eventos por segundo con baja latencia, ideal para sistemas de alta escala donde el historial de eventos es valioso.
Ejemplo practico: Una saga que procesa 10,000 pedidos por minuto usa Kafka para garantizar que ningun evento se pierda y todos se procesen en orden.
Topic (Kafka)
Definicion: Canal de eventos con nombre donde los productores escriben y los consumidores leen. Los topics se dividen en particiones para paralelismo.
Por que es importante: Organiza eventos por categoria o dominio, permitiendo que diferentes servicios se suscriban solo a los topics relevantes.
Ejemplo practico: El topic payment-events contiene todos los eventos de pagos (iniciado, completado, fallido), mientras inventory-events contiene eventos de inventario.
Particion (Kafka)
Definicion: Subdivision de un topic que permite procesamiento paralelo. Los mensajes con la misma clave siempre van a la misma particion.
Por que es importante: Permite escalar horizontalmente manteniendo orden por clave. Cada particion puede ser procesada por un consumer diferente.
Ejemplo practico: Un topic con 6 particiones puede ser procesado por 6 consumers simultaneamente. Todos los eventos de saga-123 van a la particion 2 y se procesan en orden.
Consumer Group
Definicion: Grupo de consumidores que comparten el trabajo de procesar un topic. Kafka distribuye particiones entre los miembros del grupo.
Por que es importante: Permite escalar consumidores horizontalmente. Agregar mas consumers al grupo aumenta el throughput automaticamente.
Ejemplo practico: El grupo order-service tiene 3 instancias. Kafka asigna 2 particiones a cada una, distribuyendo la carga equitativamente.
Offset
Definicion: Posicion de un mensaje dentro de una particion. Los consumers rastrean su offset para saber que mensajes ya procesaron.
Por que es importante: Permite reanudar el consumo desde donde se dejo, y habilita replay de eventos moviendo el offset hacia atras.
Ejemplo practico: El consumer fallo en el offset 1000. Al reiniciar, continua desde 1000 en lugar de reprocesar los 999 mensajes anteriores.
Idempotencia (Producer)
Definicion: Garantia de que cada mensaje se escribe exactamente una vez al topic, incluso si hay reintentos por fallos de red.
Por que es importante: Evita mensajes duplicados que podrian causar efectos secundarios no deseados como cobros dobles.
Ejemplo practico: El producer envia “cobrar $100” pero no recibe confirmacion por timeout. Reintenta, pero Kafka detecta el duplicado y no lo escribe de nuevo.
Event Streaming
Definicion: Paradigma donde los eventos se tratan como un flujo continuo de datos que se procesan en tiempo real y se persisten para analisis futuro.
Por que es importante: Permite construir sistemas reactivos que responden inmediatamente a eventos, y habilita casos de uso como replay, auditoria y analytics.
Ejemplo practico: Ademas de procesar pagos en tiempo real, podemos analizar patrones de compra del ultimo mes leyendo eventos historicos del topic.
Log Inmutable
Definicion: Estructura de datos donde los registros solo se agregan al final y nunca se modifican o eliminan (hasta que expiran por retencion).
Por que es importante: Proporciona un historial completo y confiable de eventos, util para auditoria, debugging y reconstruccion de estado.
Ejemplo practico: Si hay una disputa sobre un pedido, podemos ver exactamente que eventos ocurrieron y en que orden, reconstruyendo la historia completa.