← Volver al listado de tecnologías

Capítulo 19: Apache Kafka para Sagas

Por: SiempreListo
sagakafkaevent-streamingtypescriptgo

Capítulo 19: Apache Kafka para Sagas

“Kafka: event streaming para sagas de alta escala”

Introduccion

Apache Kafka es diferente a RabbitMQ. Mientras RabbitMQ es un message broker tradicional (los mensajes se eliminan despues de consumirse), Kafka es una plataforma de event streaming donde los eventos se persisten como un log inmutable.

Diferencias clave con RabbitMQ:

Kafka es ideal para sistemas de alta escala donde millones de eventos por segundo son comunes.

Arquitectura con Kafka

El diagrama muestra la estructura: topics contienen mensajes, divididos en particiones. Los consumer groups permiten que multiples instancias de un servicio procesen en paralelo.

graph TD
    P[Producer] --> T1[order-events topic]
    P --> T2[inventory-events topic]
    P --> T3[payment-events topic]

    T1 --> CG1[Order Consumer Group]
    T2 --> CG2[Inventory Consumer Group]
    T3 --> CG3[Payment Consumer Group]

    CG1 --> S1[Order Service]
    CG2 --> S2[Inventory Service]
    CG3 --> S3[Payment Service]

Configuracion de Topics

Un topic es un canal de eventos con nombre. Cada topic tiene:

Usamos diferentes topics para separar tipos de eventos:

// kafka/setup.ts
import { Kafka, Admin } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'orderflow',
  brokers: ['localhost:9092']
});

export async function setupTopics(): Promise<void> {
  const admin = kafka.admin();
  await admin.connect();

  const topics = [
    { topic: 'saga-events', numPartitions: 6, replicationFactor: 1 },
    { topic: 'saga-commands', numPartitions: 6, replicationFactor: 1 },
    { topic: 'saga-state', numPartitions: 3, replicationFactor: 1 },
    { topic: 'saga-dlq', numPartitions: 1, replicationFactor: 1 }
  ];

  await admin.createTopics({ topics });
  await admin.disconnect();
}

export { kafka };

Producer de Eventos

El Producer envia mensajes a topics de Kafka. Configuraciones importantes:

Usamos el sagaId como clave para que todos los eventos de una saga vayan a la misma particion y se procesen en orden.

// kafka/producer.ts
import { kafka } from './setup';
import { Producer, Message } from 'kafkajs';

export interface SagaEvent {
  type: string;
  sagaId: string;
  orderId?: string;
  payload: Record<string, unknown>;
  timestamp: number;
  version: number;
}

export class SagaEventProducer {
  private producer: Producer;

  constructor() {
    this.producer = kafka.producer({
      idempotent: true,
      maxInFlightRequests: 5
    });
  }

  async connect(): Promise<void> {
    await this.producer.connect();
  }

  async publish(event: SagaEvent): Promise<void> {
    const message: Message = {
      key: event.sagaId,
      value: JSON.stringify(event),
      headers: {
        'event-type': event.type,
        'saga-id': event.sagaId
      }
    };

    await this.producer.send({
      topic: 'saga-events',
      messages: [message]
    });
  }

  async publishBatch(events: SagaEvent[]): Promise<void> {
    const messages = events.map(event => ({
      key: event.sagaId,
      value: JSON.stringify(event),
      headers: {
        'event-type': event.type,
        'saga-id': event.sagaId
      }
    }));

    await this.producer.send({
      topic: 'saga-events',
      messages
    });
  }

  async disconnect(): Promise<void> {
    await this.producer.disconnect();
  }
}

Consumer Base

El Consumer de Kafka es diferente a RabbitMQ:

El flujo: suscribirse a topics, ejecutar el loop de consumo, procesar cada mensaje, manejar errores enviando a DLQ.

// kafka/consumer.ts
import { kafka } from './setup';
import { Consumer, EachMessagePayload } from 'kafkajs';
import { SagaEvent } from './producer';

export abstract class SagaEventConsumer {
  protected consumer: Consumer;

  constructor(groupId: string) {
    this.consumer = kafka.consumer({
      groupId,
      sessionTimeout: 30000,
      heartbeatInterval: 3000
    });
  }

  async start(topics: string[]): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topics, fromBeginning: false });

    await this.consumer.run({
      eachMessage: async (payload) => {
        await this.handleMessage(payload);
      }
    });
  }

  private async handleMessage(payload: EachMessagePayload): Promise<void> {
    const { topic, partition, message } = payload;

    if (!message.value) return;

    const event: SagaEvent = JSON.parse(message.value.toString());

    try {
      await this.processEvent(event);
    } catch (error) {
      await this.handleError(event, error as Error);
    }
  }

  protected abstract processEvent(event: SagaEvent): Promise<void>;

  protected async handleError(event: SagaEvent, error: Error): Promise<void> {
    // Publicar a DLQ
    const dlqProducer = kafka.producer();
    await dlqProducer.connect();
    await dlqProducer.send({
      topic: 'saga-dlq',
      messages: [{
        key: event.sagaId,
        value: JSON.stringify({ event, error: error.message })
      }]
    });
    await dlqProducer.disconnect();
  }

  async stop(): Promise<void> {
    await this.consumer.disconnect();
  }
}

Implementacion en Go

La implementacion en Go usa la biblioteca kafka-go de Segment. El patron es similar:

Go es popular para consumidores de Kafka porque su modelo de concurrencia (goroutines) maneja eficientemente muchas conexiones simultaneas.

// kafka/consumer.go
package kafka

import (
    "context"
    "encoding/json"
    "log"
    "github.com/segmentio/kafka-go"
)

type SagaEvent struct {
    Type      string                 `json:"type"`
    SagaID    string                 `json:"sagaId"`
    OrderID   string                 `json:"orderId,omitempty"`
    Payload   map[string]interface{} `json:"payload"`
    Timestamp int64                  `json:"timestamp"`
    Version   int                    `json:"version"`
}

type EventHandler func(event SagaEvent) error

type SagaConsumer struct {
    reader   *kafka.Reader
    handlers map[string]EventHandler
}

func NewSagaConsumer(brokers []string, groupID, topic string) *SagaConsumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  brokers,
        GroupID:  groupID,
        Topic:    topic,
        MinBytes: 10e3,
        MaxBytes: 10e6,
    })

    return &SagaConsumer{
        reader:   reader,
        handlers: make(map[string]EventHandler),
    }
}

func (c *SagaConsumer) RegisterHandler(eventType string, handler EventHandler) {
    c.handlers[eventType] = handler
}

func (c *SagaConsumer) Start(ctx context.Context) error {
    for {
        msg, err := c.reader.ReadMessage(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return nil
            }
            log.Printf("Error reading message: %v", err)
            continue
        }

        var event SagaEvent
        if err := json.Unmarshal(msg.Value, &event); err != nil {
            log.Printf("Error unmarshaling event: %v", err)
            continue
        }

        handler, exists := c.handlers[event.Type]
        if !exists {
            log.Printf("No handler for event type: %s", event.Type)
            continue
        }

        if err := handler(event); err != nil {
            log.Printf("Error handling event: %v", err)
            // Enviar a DLQ
        }
    }
}

func (c *SagaConsumer) Close() error {
    return c.reader.Close()
}

Saga Orchestrator con Kafka Streams

El orquestador usa un State Store (almacen de estado) en memoria para rastrear sagas activas. En produccion, esto seria un KTable o una base de datos externa.

El flujo del orquestador:

  1. Recibe eventos *_COMPLETED o *_FAILED
  2. Busca el estado de la saga correspondiente
  3. Si completado: avanza al siguiente paso o finaliza
  4. Si fallido: inicia compensaciones en orden inverso

La secuencia de pasos (stepSequence) define tanto la operacion como su compensacion correspondiente.

// saga/kafka-orchestrator.ts
import { SagaEventConsumer } from '../kafka/consumer';
import { SagaEventProducer, SagaEvent } from '../kafka/producer';

interface SagaState {
  sagaId: string;
  status: 'running' | 'completed' | 'compensating' | 'failed';
  currentStep: number;
  completedSteps: string[];
  context: Record<string, unknown>;
}

export class KafkaSagaOrchestrator extends SagaEventConsumer {
  private producer: SagaEventProducer;
  private stateStore: Map<string, SagaState> = new Map();

  private stepSequence = [
    { name: 'CREATE_ORDER', compensate: 'CANCEL_ORDER' },
    { name: 'RESERVE_STOCK', compensate: 'RELEASE_STOCK' },
    { name: 'PROCESS_PAYMENT', compensate: 'REFUND_PAYMENT' },
    { name: 'SCHEDULE_SHIPPING', compensate: 'CANCEL_SHIPPING' }
  ];

  constructor() {
    super('saga-orchestrator');
    this.producer = new SagaEventProducer();
  }

  async initialize(): Promise<void> {
    await this.producer.connect();
    await this.start(['saga-events']);
  }

  protected async processEvent(event: SagaEvent): Promise<void> {
    const state = this.stateStore.get(event.sagaId);
    if (!state) return;

    if (event.type.endsWith('_COMPLETED')) {
      await this.handleStepCompleted(state, event);
    } else if (event.type.endsWith('_FAILED')) {
      await this.handleStepFailed(state, event);
    }
  }

  private async handleStepCompleted(state: SagaState, event: SagaEvent): Promise<void> {
    const stepName = event.type.replace('_COMPLETED', '');
    state.completedSteps.push(stepName);
    state.currentStep++;
    state.context = { ...state.context, ...event.payload };

    if (state.currentStep >= this.stepSequence.length) {
      state.status = 'completed';
      await this.producer.publish({
        type: 'SAGA_COMPLETED',
        sagaId: state.sagaId,
        payload: state.context,
        timestamp: Date.now(),
        version: 1
      });
    } else {
      const nextStep = this.stepSequence[state.currentStep];
      await this.producer.publish({
        type: `${nextStep.name}_COMMAND`,
        sagaId: state.sagaId,
        payload: state.context,
        timestamp: Date.now(),
        version: 1
      });
    }

    this.stateStore.set(state.sagaId, state);
  }

  private async handleStepFailed(state: SagaState, event: SagaEvent): Promise<void> {
    state.status = 'compensating';

    for (let i = state.completedSteps.length - 1; i >= 0; i--) {
      const step = this.stepSequence.find(s => s.name === state.completedSteps[i]);
      if (step) {
        await this.producer.publish({
          type: `${step.compensate}_COMMAND`,
          sagaId: state.sagaId,
          payload: state.context,
          timestamp: Date.now(),
          version: 1
        });
      }
    }

    state.status = 'failed';
    this.stateStore.set(state.sagaId, state);
  }

  async startSaga(sagaId: string, initialData: Record<string, unknown>): Promise<void> {
    const state: SagaState = {
      sagaId,
      status: 'running',
      currentStep: 0,
      completedSteps: [],
      context: initialData
    };

    this.stateStore.set(sagaId, state);

    await this.producer.publish({
      type: 'CREATE_ORDER_COMMAND',
      sagaId,
      payload: initialData,
      timestamp: Date.now(),
      version: 1
    });
  }
}

Resumen

Glosario

Apache Kafka

Definicion: Plataforma distribuida de event streaming que persiste eventos como un log inmutable, permitiendo procesamiento en tiempo real y replay de datos historicos.

Por que es importante: Maneja millones de eventos por segundo con baja latencia, ideal para sistemas de alta escala donde el historial de eventos es valioso.

Ejemplo practico: Una saga que procesa 10,000 pedidos por minuto usa Kafka para garantizar que ningun evento se pierda y todos se procesen en orden.


Topic (Kafka)

Definicion: Canal de eventos con nombre donde los productores escriben y los consumidores leen. Los topics se dividen en particiones para paralelismo.

Por que es importante: Organiza eventos por categoria o dominio, permitiendo que diferentes servicios se suscriban solo a los topics relevantes.

Ejemplo practico: El topic payment-events contiene todos los eventos de pagos (iniciado, completado, fallido), mientras inventory-events contiene eventos de inventario.


Particion (Kafka)

Definicion: Subdivision de un topic que permite procesamiento paralelo. Los mensajes con la misma clave siempre van a la misma particion.

Por que es importante: Permite escalar horizontalmente manteniendo orden por clave. Cada particion puede ser procesada por un consumer diferente.

Ejemplo practico: Un topic con 6 particiones puede ser procesado por 6 consumers simultaneamente. Todos los eventos de saga-123 van a la particion 2 y se procesan en orden.


Consumer Group

Definicion: Grupo de consumidores que comparten el trabajo de procesar un topic. Kafka distribuye particiones entre los miembros del grupo.

Por que es importante: Permite escalar consumidores horizontalmente. Agregar mas consumers al grupo aumenta el throughput automaticamente.

Ejemplo practico: El grupo order-service tiene 3 instancias. Kafka asigna 2 particiones a cada una, distribuyendo la carga equitativamente.


Offset

Definicion: Posicion de un mensaje dentro de una particion. Los consumers rastrean su offset para saber que mensajes ya procesaron.

Por que es importante: Permite reanudar el consumo desde donde se dejo, y habilita replay de eventos moviendo el offset hacia atras.

Ejemplo practico: El consumer fallo en el offset 1000. Al reiniciar, continua desde 1000 en lugar de reprocesar los 999 mensajes anteriores.


Idempotencia (Producer)

Definicion: Garantia de que cada mensaje se escribe exactamente una vez al topic, incluso si hay reintentos por fallos de red.

Por que es importante: Evita mensajes duplicados que podrian causar efectos secundarios no deseados como cobros dobles.

Ejemplo practico: El producer envia “cobrar $100” pero no recibe confirmacion por timeout. Reintenta, pero Kafka detecta el duplicado y no lo escribe de nuevo.


Event Streaming

Definicion: Paradigma donde los eventos se tratan como un flujo continuo de datos que se procesan en tiempo real y se persisten para analisis futuro.

Por que es importante: Permite construir sistemas reactivos que responden inmediatamente a eventos, y habilita casos de uso como replay, auditoria y analytics.

Ejemplo practico: Ademas de procesar pagos en tiempo real, podemos analizar patrones de compra del ultimo mes leyendo eventos historicos del topic.


Log Inmutable

Definicion: Estructura de datos donde los registros solo se agregan al final y nunca se modifican o eliminan (hasta que expiran por retencion).

Por que es importante: Proporciona un historial completo y confiable de eventos, util para auditoria, debugging y reconstruccion de estado.

Ejemplo practico: Si hay una disputa sobre un pedido, podemos ver exactamente que eventos ocurrieron y en que orden, reconstruyendo la historia completa.


← Capítulo 18: RabbitMQ | Capítulo 20: Frontend Tracking →