Capítulo 12: Event Store con MongoDB en Go
Capítulo 12: Event Store con MongoDB en Go
“MongoDB ofrece flexibilidad y escalabilidad para Event Stores”
Por Qué MongoDB para Event Sourcing
MongoDB es una base de datos NoSQL orientada a documentos que ofrece características valiosas para Event Sourcing:
- Schema flexible: Los eventos pueden evolucionar sin migraciones de esquema
- BSON nativo: Formato binario eficiente para serializar eventos complejos
- Transacciones: Desde MongoDB 4.0, soporta transacciones ACID multi-documento
- Escalabilidad horizontal: Sharding automático para grandes volúmenes de eventos
- Índices compuestos: Optimizan consultas por stream y posición
En este capítulo implementaremos un Event Store completo usando MongoDB como backend de persistencia.
Setup de MongoDB
# Agregar driver de MongoDB
go get go.mongodb.org/mongo-driver/mongo
# docker-compose.yml
services:
mongodb:
image: mongo:7
ports:
- "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: orderflow
MONGO_INITDB_ROOT_PASSWORD: orderflow
volumes:
- mongo_data:/data/db
volumes:
mongo_data:
Implementación del Event Store
La implementación usa el driver oficial de MongoDB para Go. Los conceptos clave son:
- Colección de eventos: Almacena todos los eventos como documentos BSON
- Colección de contadores: Genera posiciones globales únicas de forma atómica
- Índices únicos: Garantizan que no haya duplicados por stream+posición
- Transacciones: Aseguran consistencia al verificar versión y escribir eventos
// internal/infrastructure/eventstore/mongodb.go
package eventstore
import (
"context"
"encoding/json"
"fmt"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/siemprelisto/orderflow-es-go/internal/domain/events"
)
type MongoEventStore struct {
client *mongo.Client
db *mongo.Database
events *mongo.Collection
counters *mongo.Collection
}
type mongoEvent struct {
GlobalPosition int64 `bson:"globalPosition"`
StreamID string `bson:"streamId"`
StreamPosition int `bson:"streamPosition"`
EventType string `bson:"eventType"`
Data bson.Raw `bson:"data"`
Metadata bson.Raw `bson:"metadata"`
CreatedAt time.Time `bson:"createdAt"`
}
func NewMongoEventStore(ctx context.Context, uri string) (*MongoEventStore, error) {
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
return nil, fmt.Errorf("failed to connect to MongoDB: %w", err)
}
db := client.Database("orderflow")
store := &MongoEventStore{
client: client,
db: db,
events: db.Collection("events"),
counters: db.Collection("counters"),
}
if err := store.createIndexes(ctx); err != nil {
return nil, err
}
return store, nil
}
func (s *MongoEventStore) createIndexes(ctx context.Context) error {
indexes := []mongo.IndexModel{
{
Keys: bson.D{{Key: "streamId", Value: 1}, {Key: "streamPosition", Value: 1}},
Options: options.Index().SetUnique(true),
},
{
Keys: bson.D{{Key: "globalPosition", Value: 1}},
},
{
Keys: bson.D{{Key: "eventType", Value: 1}},
},
}
_, err := s.events.Indexes().CreateMany(ctx, indexes)
return err
}
func (s *MongoEventStore) getNextGlobalPosition(ctx context.Context) (int64, error) {
filter := bson.M{"_id": "globalPosition"}
update := bson.M{"$inc": bson.M{"value": 1}}
opts := options.FindOneAndUpdate().
SetUpsert(true).
SetReturnDocument(options.After)
var result struct {
Value int64 `bson:"value"`
}
err := s.counters.FindOneAndUpdate(ctx, filter, update, opts).Decode(&result)
if err != nil {
return 0, err
}
return result.Value, nil
}
func (s *MongoEventStore) Append(
ctx context.Context,
streamID string,
domainEvents []events.DomainEvent,
expectedVersion int,
) (AppendResult, error) {
if len(domainEvents) == 0 {
return AppendResult{
NextExpectedVersion: expectedVersion,
GlobalPosition: 0,
EventsAppended: 0,
}, nil
}
// Iniciar sesión para transacción
session, err := s.client.StartSession()
if err != nil {
return AppendResult{}, err
}
defer session.EndSession(ctx)
var result AppendResult
_, err = session.WithTransaction(ctx, func(sessCtx mongo.SessionContext) (interface{}, error) {
// Verificar versión actual
currentVersion, err := s.getStreamVersionInTx(sessCtx, streamID)
if err != nil {
return nil, err
}
if currentVersion != expectedVersion {
return nil, ConcurrencyError{
StreamID: streamID,
ExpectedVersion: expectedVersion,
ActualVersion: currentVersion,
}
}
// Preparar documentos
docs := make([]interface{}, len(domainEvents))
var lastPosition int64
for i, event := range domainEvents {
globalPos, err := s.getNextGlobalPosition(sessCtx)
if err != nil {
return nil, err
}
lastPosition = globalPos
payload, _ := json.Marshal(event.Payload())
metadata, _ := json.Marshal(event.Metadata())
docs[i] = mongoEvent{
GlobalPosition: globalPos,
StreamID: streamID,
StreamPosition: expectedVersion + 1 + i,
EventType: event.EventType(),
Data: payload,
Metadata: metadata,
CreatedAt: time.Now().UTC(),
}
}
// Insertar eventos
_, err = s.events.InsertMany(sessCtx, docs)
if err != nil {
return nil, err
}
result = AppendResult{
NextExpectedVersion: expectedVersion + len(domainEvents),
GlobalPosition: lastPosition,
EventsAppended: len(domainEvents),
}
return nil, nil
})
return result, err
}
func (s *MongoEventStore) getStreamVersionInTx(
ctx context.Context,
streamID string,
) (int, error) {
opts := options.FindOne().SetSort(bson.D{{Key: "streamPosition", Value: -1}})
var event mongoEvent
err := s.events.FindOne(ctx, bson.M{"streamId": streamID}, opts).Decode(&event)
if err == mongo.ErrNoDocuments {
return -1, nil
}
if err != nil {
return 0, err
}
return event.StreamPosition, nil
}
func (s *MongoEventStore) ReadStream(
ctx context.Context,
streamID string,
) ([]StoredEvent, error) {
return s.ReadStreamFrom(ctx, streamID, 0)
}
func (s *MongoEventStore) ReadStreamFrom(
ctx context.Context,
streamID string,
fromVersion int,
) ([]StoredEvent, error) {
filter := bson.M{
"streamId": streamID,
"streamPosition": bson.M{"$gte": fromVersion},
}
opts := options.Find().SetSort(bson.D{{Key: "streamPosition", Value: 1}})
cursor, err := s.events.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var events []StoredEvent
for cursor.Next(ctx) {
var doc mongoEvent
if err := cursor.Decode(&doc); err != nil {
return nil, err
}
events = append(events, StoredEvent{
GlobalPosition: doc.GlobalPosition,
StreamID: doc.StreamID,
StreamPosition: doc.StreamPosition,
EventType: doc.EventType,
Data: doc.Data,
Metadata: doc.Metadata,
CreatedAt: doc.CreatedAt,
})
}
return events, cursor.Err()
}
func (s *MongoEventStore) ReadAll(
ctx context.Context,
fromPosition int64,
limit int,
) ([]StoredEvent, error) {
filter := bson.M{"globalPosition": bson.M{"$gte": fromPosition}}
opts := options.Find().
SetSort(bson.D{{Key: "globalPosition", Value: 1}}).
SetLimit(int64(limit))
cursor, err := s.events.Find(ctx, filter, opts)
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
var events []StoredEvent
for cursor.Next(ctx) {
var doc mongoEvent
if err := cursor.Decode(&doc); err != nil {
return nil, err
}
events = append(events, StoredEvent{
GlobalPosition: doc.GlobalPosition,
StreamID: doc.StreamID,
StreamPosition: doc.StreamPosition,
EventType: doc.EventType,
Data: doc.Data,
Metadata: doc.Metadata,
CreatedAt: doc.CreatedAt,
})
}
return events, cursor.Err()
}
func (s *MongoEventStore) GetStreamVersion(
ctx context.Context,
streamID string,
) (int, error) {
return s.getStreamVersionInTx(ctx, streamID)
}
func (s *MongoEventStore) Close(ctx context.Context) error {
return s.client.Disconnect(ctx)
}
Testing con Testcontainers
Testcontainers es una librería que levanta contenedores Docker durante los tests. Esto permite probar contra una instancia real de MongoDB en lugar de mocks, garantizando que el código funciona con la base de datos real.
Cada test obtiene una instancia limpia de MongoDB, eliminando efectos secundarios entre tests.
// internal/infrastructure/eventstore/mongodb_test.go
package eventstore_test
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/mongodb"
"github.com/siemprelisto/orderflow-es-go/internal/domain/events"
"github.com/siemprelisto/orderflow-es-go/internal/infrastructure/eventstore"
)
func setupMongoDB(t *testing.T) (*eventstore.MongoEventStore, func()) {
ctx := context.Background()
container, err := mongodb.Run(ctx, "mongo:7")
require.NoError(t, err)
uri, err := container.ConnectionString(ctx)
require.NoError(t, err)
store, err := eventstore.NewMongoEventStore(ctx, uri)
require.NoError(t, err)
cleanup := func() {
store.Close(ctx)
container.Terminate(ctx)
}
return store, cleanup
}
func TestMongoEventStore_Append(t *testing.T) {
store, cleanup := setupMongoDB(t)
defer cleanup()
ctx := context.Background()
event := events.NewEvent(
"TestEvent",
"stream-1",
"Test",
0,
map[string]string{"key": "value"},
)
result, err := store.Append(ctx, "stream-1", []events.DomainEvent{event}, -1)
assert.NoError(t, err)
assert.Equal(t, 1, result.EventsAppended)
assert.Equal(t, 0, result.NextExpectedVersion)
}
func TestMongoEventStore_ConcurrencyConflict(t *testing.T) {
store, cleanup := setupMongoDB(t)
defer cleanup()
ctx := context.Background()
// Primer append
event1 := events.NewEvent("Event1", "stream-1", "Test", 0, nil)
_, err := store.Append(ctx, "stream-1", []events.DomainEvent{event1}, -1)
require.NoError(t, err)
// Segundo append con versión incorrecta
event2 := events.NewEvent("Event2", "stream-1", "Test", 1, nil)
_, err = store.Append(ctx, "stream-1", []events.DomainEvent{event2}, -1)
var concErr eventstore.ConcurrencyError
assert.ErrorAs(t, err, &concErr)
assert.Equal(t, 0, concErr.ActualVersion)
}
func TestMongoEventStore_ReadStream(t *testing.T) {
store, cleanup := setupMongoDB(t)
defer cleanup()
ctx := context.Background()
// Append eventos
events := []events.DomainEvent{
events.NewEvent("E1", "stream-1", "Test", 0, nil),
events.NewEvent("E2", "stream-1", "Test", 1, nil),
events.NewEvent("E3", "stream-1", "Test", 2, nil),
}
_, err := store.Append(ctx, "stream-1", events, -1)
require.NoError(t, err)
// Leer stream
stored, err := store.ReadStream(ctx, "stream-1")
require.NoError(t, err)
assert.Len(t, stored, 3)
assert.Equal(t, "E1", stored[0].EventType)
assert.Equal(t, "E2", stored[1].EventType)
assert.Equal(t, "E3", stored[2].EventType)
}
Connection Factory
El patrón Factory encapsula la lógica de creación de conexiones, centralizando la configuración y facilitando cambios futuros. La configuración se lee de variables de entorno, siguiendo el principio de “12-factor apps”.
// internal/infrastructure/database/mongo.go
package database
import (
"context"
"fmt"
"os"
"github.com/siemprelisto/orderflow-es-go/internal/infrastructure/eventstore"
)
type MongoConfig struct {
URI string
Database string
}
func LoadMongoConfig() MongoConfig {
uri := os.Getenv("MONGODB_URI")
if uri == "" {
uri = "mongodb://orderflow:orderflow@localhost:27017"
}
return MongoConfig{
URI: uri,
Database: "orderflow",
}
}
func NewMongoEventStore(ctx context.Context) (*eventstore.MongoEventStore, error) {
config := LoadMongoConfig()
return eventstore.NewMongoEventStore(ctx, config.URI)
}
Resumen
- MongoDB ofrece transacciones para consistencia
- Los índices optimizan búsquedas por stream
- Testcontainers facilita testing de integración
- El contador global garantiza orden de eventos
- La estructura es flexible para evolución del schema
Glosario
BSON (Binary JSON)
Definición: Formato binario usado por MongoDB para representar documentos. Es más eficiente que JSON en espacio y velocidad de parsing.
Por qué es importante: Permite almacenar eventos con tipos de datos ricos (fechas, binarios, decimales) que JSON no soporta nativamente.
Ejemplo práctico: bson.M{"streamId": "order-123"} crea un documento BSON para filtrar eventos de un stream específico.
Índice Compuesto
Definición: Índice que incluye múltiples campos, optimizando consultas que filtran por esa combinación de campos.
Por qué es importante: El índice {streamId, streamPosition} permite buscar eventos de un stream ordenados por posición en O(log n).
Ejemplo práctico: Sin el índice, leer 100 eventos de un stream entre millones requeriría escanear toda la colección.
FindOneAndUpdate con Upsert
Definición: Operación atómica que busca un documento, lo actualiza si existe, o lo crea si no existe.
Por qué es importante: Genera contadores globales únicos sin condiciones de carrera, incluso con múltiples escritores concurrentes.
Ejemplo práctico: $inc: {value: 1} incrementa atómicamente el contador de posición global cada vez que se escribe un evento.
Testcontainers
Definición: Librería que permite crear contenedores Docker desechables durante la ejecución de tests.
Por qué es importante: Prueba código contra dependencias reales (MongoDB, PostgreSQL, Redis) en lugar de mocks, detectando problemas de integración temprano.
Ejemplo práctico: mongodb.Run(ctx, "mongo:7") levanta MongoDB 7, ejecuta los tests, y destruye el contenedor al finalizar.
Sesión y Transacción en MongoDB
Definición: Una sesión agrupa operaciones relacionadas; una transacción garantiza que todas se aplican o ninguna (ACID).
Por qué es importante: Permite verificar la versión esperada del stream y escribir eventos de forma atómica, evitando corrupción por escrituras concurrentes.
Ejemplo práctico: session.WithTransaction() ejecuta verificación de versión + inserción de eventos; si falla cualquiera, se hace rollback automático.
Cursor en MongoDB
Definición: Iterador que permite procesar resultados de una consulta de forma eficiente, sin cargar todo en memoria.
Por qué es importante: Cuando un stream tiene miles de eventos, el cursor los entrega uno a uno evitando consumir memoria excesiva.
Ejemplo práctico: cursor.Next(ctx) avanza al siguiente documento; cursor.Decode(&doc) lo deserializa a un struct Go.
12-Factor App
Definición: Metodología para construir aplicaciones cloud-native que establece 12 principios, incluyendo configuración via variables de entorno.
Por qué es importante: Permite cambiar configuración (URL de base de datos, credenciales) sin recompilar, facilitando deployment en diferentes entornos.
Ejemplo práctico: os.Getenv("MONGODB_URI") lee la URL de conexión del entorno, permitiendo que el mismo binario funcione en dev, staging y producción.
← Capítulo 11: Go Setup | Capítulo 13: Agregados y Repositorios en Go →