← Volver al listado de tecnologías

Capítulo 17: Versionado de Eventos

Por: SiempreListo
event-sourcingversionadoschema-evolutioncompatibilidad

Capítulo 17: Versionado de Eventos

“Los eventos son inmutables, pero los schemas deben evolucionar”

El Desafío de la Evolución de Schemas

En Event Sourcing enfrentamos una tensión fundamental:

  1. Los eventos son inmutables: Una vez almacenados, no deben modificarse (son hechos históricos)
  2. El código evoluciona: Nuevos requisitos requieren nuevos campos, cambios de estructura
  3. Debemos leer eventos antiguos: El sistema debe funcionar con eventos de hace años

Este capítulo explora estrategias para evolucionar schemas de eventos sin romper compatibilidad con datos existentes.

El Problema de la Evolución

Los eventos almacenados son inmutables, pero el código evoluciona:

Estrategias de Versionado

Existen tres estrategias principales, cada una con diferentes trade-offs:

1. Weak Schema (Schema Débil)

La estrategia más simple: campos nuevos son opcionales y tienen valores por defecto. El código actual maneja eventos antiguos proporcionando defaults para campos ausentes:

// V1: Original
interface OrderCreatedV1 {
  orderId: string;
  customerId: string;
  items: OrderItem[];
}

// V2: Agregar campo opcional
interface OrderCreatedV2 {
  orderId: string;
  customerId: string;
  items: OrderItem[];
  customerEmail?: string;  // Nuevo, opcional
  currency?: string;       // Nuevo, con default
}

// Deserialización con defaults
function deserializeOrderCreated(data: unknown): OrderCreatedV2 {
  const base = data as OrderCreatedV1;
  return {
    ...base,
    customerEmail: (data as any).customerEmail ?? undefined,
    currency: (data as any).currency ?? 'USD'
  };
}

2. Event Versioning Explícito

Incluir un campo version en cada evento y manejar múltiples versiones explícitamente en el código. Es más verbose pero más claro sobre qué versión se está procesando:

// Tipos versionados
type OrderCreatedV1 = {
  type: 'OrderCreated';
  version: 1;
  orderId: string;
  customerId: string;
};

type OrderCreatedV2 = {
  type: 'OrderCreated';
  version: 2;
  orderId: string;
  customerId: string;
  customerEmail: string;
  shippingAddress: Address;
};

type OrderCreated = OrderCreatedV1 | OrderCreatedV2;

// Handler que soporta múltiples versiones
function handleOrderCreated(event: OrderCreated): void {
  if (event.version === 1) {
    // Lógica para V1
    console.log(`Order ${event.orderId} created (legacy)`);
  } else {
    // Lógica para V2
    console.log(`Order ${event.orderId} for ${event.customerEmail}`);
  }
}

3. Upcasting (Transformación al Leer)

Transformar eventos antiguos a la versión actual al leerlos. El código de negocio solo conoce la versión más reciente; la transformación ocurre en la capa de infraestructura:

// src/infrastructure/event-store/upcasters.ts
type Upcaster = (event: StoredEvent) => StoredEvent;

const upcasters: Map<string, Upcaster[]> = new Map();

// Registrar upcaster de V1 a V2
upcasters.set('OrderCreated', [
  // V1 -> V2
  (event) => {
    if (event.data.version === 1 || !event.data.version) {
      return {
        ...event,
        data: {
          ...event.data,
          version: 2,
          customerEmail: event.data.customerEmail ?? '[email protected]',
          currency: event.data.currency ?? 'USD'
        }
      };
    }
    return event;
  }
]);

export function upcast(event: StoredEvent): StoredEvent {
  const eventUpcasters = upcasters.get(event.eventType) ?? [];

  return eventUpcasters.reduce(
    (e, upcaster) => upcaster(e),
    event
  );
}

// Uso en el Event Store
class UpcastingEventStore implements EventStore {
  constructor(private inner: EventStore) {}

  async readStream(streamId: string): Promise<StoredEvent[]> {
    const events = await this.inner.readStream(streamId);
    return events.map(upcast);
  }

  // ... otros métodos
}

Implementación Completa

El upcaster se implementa como una cadena de transformaciones ordenadas por versión. Cada regla sabe transformar de una versión específica a la siguiente:

// src/infrastructure/event-store/event-upcaster.ts
interface UpcasterRule {
  fromVersion: number;
  toVersion: number;
  transform: (data: unknown) => unknown;
}

export class EventUpcaster {
  private rules: Map<string, UpcasterRule[]> = new Map();

  register(eventType: string, rule: UpcasterRule): void {
    const existing = this.rules.get(eventType) ?? [];
    existing.push(rule);
    // Ordenar por versión
    existing.sort((a, b) => a.fromVersion - b.fromVersion);
    this.rules.set(eventType, existing);
  }

  upcast(eventType: string, data: unknown): unknown {
    const rules = this.rules.get(eventType);
    if (!rules) return data;

    let currentData = data;
    let currentVersion = (data as any).version ?? 1;

    for (const rule of rules) {
      if (currentVersion === rule.fromVersion) {
        currentData = rule.transform(currentData);
        currentVersion = rule.toVersion;
      }
    }

    return currentData;
  }
}

// Configuración de upcasters
const upcaster = new EventUpcaster();

// OrderCreated: V1 -> V2
upcaster.register('OrderCreated', {
  fromVersion: 1,
  toVersion: 2,
  transform: (data: any) => ({
    ...data,
    version: 2,
    customerEmail: data.customerEmail ?? `customer-${data.customerId}@legacy.local`,
    currency: 'USD'
  })
});

// OrderCreated: V2 -> V3
upcaster.register('OrderCreated', {
  fromVersion: 2,
  toVersion: 3,
  transform: (data: any) => ({
    ...data,
    version: 3,
    // Nuevo: separar nombre y email
    customer: {
      id: data.customerId,
      email: data.customerEmail
    }
  })
});

Go: Upcasting

// internal/infrastructure/eventstore/upcaster.go
package eventstore

import (
    "encoding/json"
)

type UpcasterFunc func(data json.RawMessage) (json.RawMessage, error)

type EventUpcaster struct {
    rules map[string][]UpcasterFunc
}

func NewEventUpcaster() *EventUpcaster {
    return &EventUpcaster{
        rules: make(map[string][]UpcasterFunc),
    }
}

func (u *EventUpcaster) Register(eventType string, fn UpcasterFunc) {
    u.rules[eventType] = append(u.rules[eventType], fn)
}

func (u *EventUpcaster) Upcast(eventType string, data json.RawMessage) (json.RawMessage, error) {
    rules, ok := u.rules[eventType]
    if !ok {
        return data, nil
    }

    current := data
    var err error

    for _, rule := range rules {
        current, err = rule(current)
        if err != nil {
            return nil, err
        }
    }

    return current, nil
}

// Ejemplo de upcaster
func OrderCreatedV1ToV2(data json.RawMessage) (json.RawMessage, error) {
    var v1 struct {
        OrderID    string `json:"orderId"`
        CustomerID string `json:"customerId"`
        Version    int    `json:"version"`
    }

    if err := json.Unmarshal(data, &v1); err != nil {
        return nil, err
    }

    // Ya es V2 o superior
    if v1.Version >= 2 {
        return data, nil
    }

    v2 := map[string]interface{}{
        "orderId":       v1.OrderID,
        "customerId":    v1.CustomerID,
        "version":       2,
        "customerEmail": "[email protected]",
        "currency":      "USD",
    }

    return json.Marshal(v2)
}

Python: Upcasting

# src/orderflow/infrastructure/event_store/upcaster.py
from typing import Callable, Any

UpcasterFunc = Callable[[dict[str, Any]], dict[str, Any]]

class EventUpcaster:
    def __init__(self):
        self._rules: dict[str, list[UpcasterFunc]] = {}

    def register(self, event_type: str, fn: UpcasterFunc) -> None:
        if event_type not in self._rules:
            self._rules[event_type] = []
        self._rules[event_type].append(fn)

    def upcast(self, event_type: str, data: dict[str, Any]) -> dict[str, Any]:
        rules = self._rules.get(event_type, [])
        current = data

        for rule in rules:
            current = rule(current)

        return current

# Configuración
upcaster = EventUpcaster()

def order_created_v1_to_v2(data: dict) -> dict:
    version = data.get("version", 1)
    if version >= 2:
        return data

    return {
        **data,
        "version": 2,
        "customerEmail": data.get("customerEmail", "[email protected]"),
        "currency": data.get("currency", "USD"),
    }

upcaster.register("OrderCreated", order_created_v1_to_v2)

Mejores Prácticas

PrácticaDescripción
Backward compatibleNuevos campos opcionales, código nuevo lee datos antiguos
Forward compatibleIgnorar campos desconocidos, código antiguo lee datos nuevos
Versionado semánticov1, v2, v3 en el evento indica cambios de estructura
Documentar cambiosChangelog de eventos con motivo y transformación
Tests de migraciónVerificar que cada upcaster transforma correctamente

Regla de oro: Nunca elimines campos de eventos existentes. Márcalos como deprecated y deja de usarlos, pero mantenlos para compatibilidad.

Testing de Upcasters

describe('OrderCreated Upcaster', () => {
  it('should upcast V1 to V2', () => {
    const v1Data = {
      orderId: 'order-1',
      customerId: 'cust-1',
      items: []
    };

    const result = upcaster.upcast('OrderCreated', v1Data);

    expect(result.version).toBe(2);
    expect(result.customerEmail).toBeDefined();
    expect(result.currency).toBe('USD');
  });

  it('should not modify V2 events', () => {
    const v2Data = {
      orderId: 'order-1',
      customerId: 'cust-1',
      customerEmail: '[email protected]',
      currency: 'EUR',
      version: 2
    };

    const result = upcaster.upcast('OrderCreated', v2Data);

    expect(result).toEqual(v2Data);
  });
});

Resumen

Glosario

Schema Evolution (Evolución de Schema)

Definición: Proceso de cambiar la estructura de datos (campos, tipos) mientras se mantiene compatibilidad con datos existentes.

Por qué es importante: En Event Sourcing los eventos persisten para siempre; debemos leer eventos de hace años con código actual.

Ejemplo práctico: Agregar customerEmail a OrderCreated sin romper la lectura de órdenes creadas antes de este cambio.


Backward Compatibility (Compatibilidad Hacia Atrás)

Definición: Código nuevo puede leer/procesar datos creados por versiones anteriores.

Por qué es importante: Después de actualizar el código, el sistema debe seguir funcionando con millones de eventos históricos.

Ejemplo práctico: V2 del handler de OrderCreated procesa tanto eventos V1 (sin email) como V2 (con email) correctamente.


Forward Compatibility (Compatibilidad Hacia Adelante)

Definición: Código antiguo puede leer/procesar datos creados por versiones posteriores (ignora campos desconocidos).

Por qué es importante: Durante deployments graduales, instancias antiguas deben tolerar eventos de instancias ya actualizadas.

Ejemplo práctico: Servicio V1 recibe evento V2 con campo extra loyalty_points; lo ignora y procesa correctamente.


Weak Schema

Definición: Estrategia de versionado donde todos los campos nuevos son opcionales con valores por defecto.

Por qué es importante: Es la estrategia más simple; no requiere transformaciones ni versionado explícito.

Ejemplo práctico: const currency = event.data.currency ?? 'USD' proporciona default para eventos antiguos sin el campo.


Upcasting

Definición: Transformación de eventos de versiones antiguas a la versión actual al momento de leerlos.

Por qué es importante: El código de negocio solo conoce la versión más reciente; la complejidad de versiones queda en infraestructura.

Ejemplo práctico: Evento V1 en disco se transforma a V2 en memoria antes de pasarlo al agregado, que solo entiende V2.


Upcaster Chain

Definición: Secuencia ordenada de transformaciones que convierten un evento de cualquier versión a la más reciente.

Por qué es importante: Si hay V1, V2, V3, un evento V1 pasa por V1->V2 y luego V2->V3 automáticamente.

Ejemplo práctico: upcasters.set('OrderCreated', [v1ToV2, v2ToV3]) define la cadena; un evento V1 se transforma dos veces.


Breaking Change

Definición: Cambio en el schema que rompe compatibilidad con datos o código existente.

Por qué es importante: En Event Sourcing, los breaking changes son especialmente problemáticos porque no podemos modificar eventos almacenados.

Ejemplo práctico: Cambiar customerId: string a customerId: number rompe la deserialización de eventos anteriores con strings.


← Capítulo 16: Snapshots | Capítulo 18: Event Upcasting Avanzado →