← Volver al listado de tecnologías

Capítulo 12: Event Store con MongoDB en Go

Por: SiempreListo
event-sourcinggomongodbevent-store

Capítulo 12: Event Store con MongoDB en Go

“MongoDB ofrece flexibilidad y escalabilidad para Event Stores”

Por Qué MongoDB para Event Sourcing

MongoDB es una base de datos NoSQL orientada a documentos que ofrece características valiosas para Event Sourcing:

En este capítulo implementaremos un Event Store completo usando MongoDB como backend de persistencia.

Setup de MongoDB

# Agregar driver de MongoDB
go get go.mongodb.org/mongo-driver/mongo
# docker-compose.yml
services:
  mongodb:
    image: mongo:7
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: orderflow
      MONGO_INITDB_ROOT_PASSWORD: orderflow
    volumes:
      - mongo_data:/data/db

volumes:
  mongo_data:

Implementación del Event Store

La implementación usa el driver oficial de MongoDB para Go. Los conceptos clave son:

// internal/infrastructure/eventstore/mongodb.go
package eventstore

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"

    "github.com/siemprelisto/orderflow-es-go/internal/domain/events"
)

type MongoEventStore struct {
    client     *mongo.Client
    db         *mongo.Database
    events     *mongo.Collection
    counters   *mongo.Collection
}

type mongoEvent struct {
    GlobalPosition int64     `bson:"globalPosition"`
    StreamID       string    `bson:"streamId"`
    StreamPosition int       `bson:"streamPosition"`
    EventType      string    `bson:"eventType"`
    Data           bson.Raw  `bson:"data"`
    Metadata       bson.Raw  `bson:"metadata"`
    CreatedAt      time.Time `bson:"createdAt"`
}

func NewMongoEventStore(ctx context.Context, uri string) (*MongoEventStore, error) {
    client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
    if err != nil {
        return nil, fmt.Errorf("failed to connect to MongoDB: %w", err)
    }

    db := client.Database("orderflow")
    store := &MongoEventStore{
        client:   client,
        db:       db,
        events:   db.Collection("events"),
        counters: db.Collection("counters"),
    }

    if err := store.createIndexes(ctx); err != nil {
        return nil, err
    }

    return store, nil
}

func (s *MongoEventStore) createIndexes(ctx context.Context) error {
    indexes := []mongo.IndexModel{
        {
            Keys:    bson.D{{Key: "streamId", Value: 1}, {Key: "streamPosition", Value: 1}},
            Options: options.Index().SetUnique(true),
        },
        {
            Keys: bson.D{{Key: "globalPosition", Value: 1}},
        },
        {
            Keys: bson.D{{Key: "eventType", Value: 1}},
        },
    }

    _, err := s.events.Indexes().CreateMany(ctx, indexes)
    return err
}

func (s *MongoEventStore) getNextGlobalPosition(ctx context.Context) (int64, error) {
    filter := bson.M{"_id": "globalPosition"}
    update := bson.M{"$inc": bson.M{"value": 1}}
    opts := options.FindOneAndUpdate().
        SetUpsert(true).
        SetReturnDocument(options.After)

    var result struct {
        Value int64 `bson:"value"`
    }

    err := s.counters.FindOneAndUpdate(ctx, filter, update, opts).Decode(&result)
    if err != nil {
        return 0, err
    }

    return result.Value, nil
}

func (s *MongoEventStore) Append(
    ctx context.Context,
    streamID string,
    domainEvents []events.DomainEvent,
    expectedVersion int,
) (AppendResult, error) {
    if len(domainEvents) == 0 {
        return AppendResult{
            NextExpectedVersion: expectedVersion,
            GlobalPosition:      0,
            EventsAppended:      0,
        }, nil
    }

    // Iniciar sesión para transacción
    session, err := s.client.StartSession()
    if err != nil {
        return AppendResult{}, err
    }
    defer session.EndSession(ctx)

    var result AppendResult

    _, err = session.WithTransaction(ctx, func(sessCtx mongo.SessionContext) (interface{}, error) {
        // Verificar versión actual
        currentVersion, err := s.getStreamVersionInTx(sessCtx, streamID)
        if err != nil {
            return nil, err
        }

        if currentVersion != expectedVersion {
            return nil, ConcurrencyError{
                StreamID:        streamID,
                ExpectedVersion: expectedVersion,
                ActualVersion:   currentVersion,
            }
        }

        // Preparar documentos
        docs := make([]interface{}, len(domainEvents))
        var lastPosition int64

        for i, event := range domainEvents {
            globalPos, err := s.getNextGlobalPosition(sessCtx)
            if err != nil {
                return nil, err
            }
            lastPosition = globalPos

            payload, _ := json.Marshal(event.Payload())
            metadata, _ := json.Marshal(event.Metadata())

            docs[i] = mongoEvent{
                GlobalPosition: globalPos,
                StreamID:       streamID,
                StreamPosition: expectedVersion + 1 + i,
                EventType:      event.EventType(),
                Data:           payload,
                Metadata:       metadata,
                CreatedAt:      time.Now().UTC(),
            }
        }

        // Insertar eventos
        _, err = s.events.InsertMany(sessCtx, docs)
        if err != nil {
            return nil, err
        }

        result = AppendResult{
            NextExpectedVersion: expectedVersion + len(domainEvents),
            GlobalPosition:      lastPosition,
            EventsAppended:      len(domainEvents),
        }

        return nil, nil
    })

    return result, err
}

func (s *MongoEventStore) getStreamVersionInTx(
    ctx context.Context,
    streamID string,
) (int, error) {
    opts := options.FindOne().SetSort(bson.D{{Key: "streamPosition", Value: -1}})

    var event mongoEvent
    err := s.events.FindOne(ctx, bson.M{"streamId": streamID}, opts).Decode(&event)

    if err == mongo.ErrNoDocuments {
        return -1, nil
    }
    if err != nil {
        return 0, err
    }

    return event.StreamPosition, nil
}

func (s *MongoEventStore) ReadStream(
    ctx context.Context,
    streamID string,
) ([]StoredEvent, error) {
    return s.ReadStreamFrom(ctx, streamID, 0)
}

func (s *MongoEventStore) ReadStreamFrom(
    ctx context.Context,
    streamID string,
    fromVersion int,
) ([]StoredEvent, error) {
    filter := bson.M{
        "streamId":       streamID,
        "streamPosition": bson.M{"$gte": fromVersion},
    }
    opts := options.Find().SetSort(bson.D{{Key: "streamPosition", Value: 1}})

    cursor, err := s.events.Find(ctx, filter, opts)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(ctx)

    var events []StoredEvent
    for cursor.Next(ctx) {
        var doc mongoEvent
        if err := cursor.Decode(&doc); err != nil {
            return nil, err
        }

        events = append(events, StoredEvent{
            GlobalPosition: doc.GlobalPosition,
            StreamID:       doc.StreamID,
            StreamPosition: doc.StreamPosition,
            EventType:      doc.EventType,
            Data:           doc.Data,
            Metadata:       doc.Metadata,
            CreatedAt:      doc.CreatedAt,
        })
    }

    return events, cursor.Err()
}

func (s *MongoEventStore) ReadAll(
    ctx context.Context,
    fromPosition int64,
    limit int,
) ([]StoredEvent, error) {
    filter := bson.M{"globalPosition": bson.M{"$gte": fromPosition}}
    opts := options.Find().
        SetSort(bson.D{{Key: "globalPosition", Value: 1}}).
        SetLimit(int64(limit))

    cursor, err := s.events.Find(ctx, filter, opts)
    if err != nil {
        return nil, err
    }
    defer cursor.Close(ctx)

    var events []StoredEvent
    for cursor.Next(ctx) {
        var doc mongoEvent
        if err := cursor.Decode(&doc); err != nil {
            return nil, err
        }

        events = append(events, StoredEvent{
            GlobalPosition: doc.GlobalPosition,
            StreamID:       doc.StreamID,
            StreamPosition: doc.StreamPosition,
            EventType:      doc.EventType,
            Data:           doc.Data,
            Metadata:       doc.Metadata,
            CreatedAt:      doc.CreatedAt,
        })
    }

    return events, cursor.Err()
}

func (s *MongoEventStore) GetStreamVersion(
    ctx context.Context,
    streamID string,
) (int, error) {
    return s.getStreamVersionInTx(ctx, streamID)
}

func (s *MongoEventStore) Close(ctx context.Context) error {
    return s.client.Disconnect(ctx)
}

Testing con Testcontainers

Testcontainers es una librería que levanta contenedores Docker durante los tests. Esto permite probar contra una instancia real de MongoDB en lugar de mocks, garantizando que el código funciona con la base de datos real.

Cada test obtiene una instancia limpia de MongoDB, eliminando efectos secundarios entre tests.

// internal/infrastructure/eventstore/mongodb_test.go
package eventstore_test

import (
    "context"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
    "github.com/testcontainers/testcontainers-go"
    "github.com/testcontainers/testcontainers-go/modules/mongodb"

    "github.com/siemprelisto/orderflow-es-go/internal/domain/events"
    "github.com/siemprelisto/orderflow-es-go/internal/infrastructure/eventstore"
)

func setupMongoDB(t *testing.T) (*eventstore.MongoEventStore, func()) {
    ctx := context.Background()

    container, err := mongodb.Run(ctx, "mongo:7")
    require.NoError(t, err)

    uri, err := container.ConnectionString(ctx)
    require.NoError(t, err)

    store, err := eventstore.NewMongoEventStore(ctx, uri)
    require.NoError(t, err)

    cleanup := func() {
        store.Close(ctx)
        container.Terminate(ctx)
    }

    return store, cleanup
}

func TestMongoEventStore_Append(t *testing.T) {
    store, cleanup := setupMongoDB(t)
    defer cleanup()

    ctx := context.Background()

    event := events.NewEvent(
        "TestEvent",
        "stream-1",
        "Test",
        0,
        map[string]string{"key": "value"},
    )

    result, err := store.Append(ctx, "stream-1", []events.DomainEvent{event}, -1)

    assert.NoError(t, err)
    assert.Equal(t, 1, result.EventsAppended)
    assert.Equal(t, 0, result.NextExpectedVersion)
}

func TestMongoEventStore_ConcurrencyConflict(t *testing.T) {
    store, cleanup := setupMongoDB(t)
    defer cleanup()

    ctx := context.Background()

    // Primer append
    event1 := events.NewEvent("Event1", "stream-1", "Test", 0, nil)
    _, err := store.Append(ctx, "stream-1", []events.DomainEvent{event1}, -1)
    require.NoError(t, err)

    // Segundo append con versión incorrecta
    event2 := events.NewEvent("Event2", "stream-1", "Test", 1, nil)
    _, err = store.Append(ctx, "stream-1", []events.DomainEvent{event2}, -1)

    var concErr eventstore.ConcurrencyError
    assert.ErrorAs(t, err, &concErr)
    assert.Equal(t, 0, concErr.ActualVersion)
}

func TestMongoEventStore_ReadStream(t *testing.T) {
    store, cleanup := setupMongoDB(t)
    defer cleanup()

    ctx := context.Background()

    // Append eventos
    events := []events.DomainEvent{
        events.NewEvent("E1", "stream-1", "Test", 0, nil),
        events.NewEvent("E2", "stream-1", "Test", 1, nil),
        events.NewEvent("E3", "stream-1", "Test", 2, nil),
    }
    _, err := store.Append(ctx, "stream-1", events, -1)
    require.NoError(t, err)

    // Leer stream
    stored, err := store.ReadStream(ctx, "stream-1")
    require.NoError(t, err)

    assert.Len(t, stored, 3)
    assert.Equal(t, "E1", stored[0].EventType)
    assert.Equal(t, "E2", stored[1].EventType)
    assert.Equal(t, "E3", stored[2].EventType)
}

Connection Factory

El patrón Factory encapsula la lógica de creación de conexiones, centralizando la configuración y facilitando cambios futuros. La configuración se lee de variables de entorno, siguiendo el principio de “12-factor apps”.

// internal/infrastructure/database/mongo.go
package database

import (
    "context"
    "fmt"
    "os"

    "github.com/siemprelisto/orderflow-es-go/internal/infrastructure/eventstore"
)

type MongoConfig struct {
    URI      string
    Database string
}

func LoadMongoConfig() MongoConfig {
    uri := os.Getenv("MONGODB_URI")
    if uri == "" {
        uri = "mongodb://orderflow:orderflow@localhost:27017"
    }

    return MongoConfig{
        URI:      uri,
        Database: "orderflow",
    }
}

func NewMongoEventStore(ctx context.Context) (*eventstore.MongoEventStore, error) {
    config := LoadMongoConfig()
    return eventstore.NewMongoEventStore(ctx, config.URI)
}

Resumen

Glosario

BSON (Binary JSON)

Definición: Formato binario usado por MongoDB para representar documentos. Es más eficiente que JSON en espacio y velocidad de parsing.

Por qué es importante: Permite almacenar eventos con tipos de datos ricos (fechas, binarios, decimales) que JSON no soporta nativamente.

Ejemplo práctico: bson.M{"streamId": "order-123"} crea un documento BSON para filtrar eventos de un stream específico.


Índice Compuesto

Definición: Índice que incluye múltiples campos, optimizando consultas que filtran por esa combinación de campos.

Por qué es importante: El índice {streamId, streamPosition} permite buscar eventos de un stream ordenados por posición en O(log n).

Ejemplo práctico: Sin el índice, leer 100 eventos de un stream entre millones requeriría escanear toda la colección.


FindOneAndUpdate con Upsert

Definición: Operación atómica que busca un documento, lo actualiza si existe, o lo crea si no existe.

Por qué es importante: Genera contadores globales únicos sin condiciones de carrera, incluso con múltiples escritores concurrentes.

Ejemplo práctico: $inc: {value: 1} incrementa atómicamente el contador de posición global cada vez que se escribe un evento.


Testcontainers

Definición: Librería que permite crear contenedores Docker desechables durante la ejecución de tests.

Por qué es importante: Prueba código contra dependencias reales (MongoDB, PostgreSQL, Redis) en lugar de mocks, detectando problemas de integración temprano.

Ejemplo práctico: mongodb.Run(ctx, "mongo:7") levanta MongoDB 7, ejecuta los tests, y destruye el contenedor al finalizar.


Sesión y Transacción en MongoDB

Definición: Una sesión agrupa operaciones relacionadas; una transacción garantiza que todas se aplican o ninguna (ACID).

Por qué es importante: Permite verificar la versión esperada del stream y escribir eventos de forma atómica, evitando corrupción por escrituras concurrentes.

Ejemplo práctico: session.WithTransaction() ejecuta verificación de versión + inserción de eventos; si falla cualquiera, se hace rollback automático.


Cursor en MongoDB

Definición: Iterador que permite procesar resultados de una consulta de forma eficiente, sin cargar todo en memoria.

Por qué es importante: Cuando un stream tiene miles de eventos, el cursor los entrega uno a uno evitando consumir memoria excesiva.

Ejemplo práctico: cursor.Next(ctx) avanza al siguiente documento; cursor.Decode(&doc) lo deserializa a un struct Go.


12-Factor App

Definición: Metodología para construir aplicaciones cloud-native que establece 12 principios, incluyendo configuración via variables de entorno.

Por qué es importante: Permite cambiar configuración (URL de base de datos, credenciales) sin recompilar, facilitando deployment en diferentes entornos.

Ejemplo práctico: os.Getenv("MONGODB_URI") lee la URL de conexión del entorno, permitiendo que el mismo binario funcione en dev, staging y producción.


← Capítulo 11: Go Setup | Capítulo 13: Agregados y Repositorios en Go →