← Volver al listado de tecnologías

Capítulo 11: Event Sourcing en Go - Setup

Por: SiempreListo
event-sourcinggosetuparquitectura

Capítulo 11: Event Sourcing en Go - Setup

“Go ofrece simplicidad y rendimiento para sistemas event-sourced”

Por Qué Go para Event Sourcing

Go es un lenguaje compilado desarrollado por Google que destaca por su simplicidad, rendimiento y excelente soporte para concurrencia. Estas características lo hacen ideal para sistemas event-sourced donde necesitamos:

En este capítulo configuraremos un proyecto Go con arquitectura hexagonal, preparando la base para implementar Event Sourcing.

Inicialización del Proyecto

# Crear directorio
mkdir orderflow-es-go && cd orderflow-es-go

# Inicializar módulo Go
# El comando 'go mod init' crea un archivo go.mod que define el módulo
# y sus dependencias. El nombre sigue la convención de ruta del repositorio.
go mod init github.com/siemprelisto/orderflow-es-go

# Instalar dependencias
# uuid: genera identificadores únicos para eventos y agregados
go get github.com/google/uuid
# pgx: driver PostgreSQL de alto rendimiento (mejor que database/sql)
go get github.com/jackc/pgx/v5
# chi: router HTTP ligero y composable
go get github.com/go-chi/chi/v5
# validator: validación de structs mediante tags
go get github.com/go-playground/validator/v10

Estructura del Proyecto

La estructura sigue la convención estándar de Go y los principios de arquitectura hexagonal:

orderflow-es-go/
├── cmd/                          # Puntos de entrada de la aplicación
│   └── api/
│       └── main.go               # Servidor HTTP principal
├── internal/                     # Código privado del módulo (no exportable)
│   ├── domain/                   # Capa de dominio (núcleo del negocio)
│   │   ├── events/               # Definición de eventos de dominio
│   │   │   ├── base.go           # Evento base e interfaz DomainEvent
│   │   │   └── order_events.go   # Eventos específicos de Order
│   │   ├── aggregates/           # Agregados (raíces de consistencia)
│   │   │   └── order/
│   │   │       ├── order.go      # Lógica del agregado Order
│   │   │       └── order_test.go
│   │   └── valueobjects/         # Value Objects inmutables
│   │       ├── address.go
│   │       └── money.go
│   ├── application/              # Capa de aplicación (casos de uso)
│   │   ├── commands/             # Comandos (intención de modificar)
│   │   │   └── create_order.go
│   │   ├── queries/              # Consultas (solo lectura)
│   │   │   └── get_order.go
│   │   └── projections/          # Proyecciones (read models)
│   │       └── orders_projection.go
│   └── infrastructure/           # Capa de infraestructura (adaptadores)
│       ├── eventstore/           # Implementaciones del Event Store
│       │   ├── interface.go      # Puerto (interfaz abstracta)
│       │   ├── postgres.go       # Adaptador PostgreSQL
│       │   └── inmemory.go       # Adaptador en memoria (testing)
│       ├── repository/
│       │   └── order_repository.go
│       └── database/
│           └── postgres.go
├── pkg/                          # Código público reutilizable
│   └── errors/
│       └── errors.go
├── go.mod                        # Definición del módulo y dependencias
├── go.sum                        # Checksums de dependencias
└── Makefile                      # Automatización de tareas

La carpeta internal/ es especial en Go: el compilador impide que otros módulos importen paquetes desde ella, garantizando encapsulación.

Eventos Base

Los eventos de dominio son el corazón de Event Sourcing. En Go, usamos interfaces para definir contratos y structs para implementaciones concretas. La interfaz DomainEvent define qué información debe tener cualquier evento del sistema.

// internal/domain/events/base.go
package events

import (
    "encoding/json"
    "time"

    "github.com/google/uuid"
)

type EventMetadata struct {
    CorrelationID string    `json:"correlationId"`
    CausationID   string    `json:"causationId"`
    UserID        string    `json:"userId,omitempty"`
    Timestamp     time.Time `json:"timestamp"`
}

func NewMetadata() EventMetadata {
    id := uuid.New().String()
    return EventMetadata{
        CorrelationID: id,
        CausationID:   id,
        Timestamp:     time.Now().UTC(),
    }
}

type DomainEvent interface {
    EventType() string
    AggregateID() string
    AggregateType() string
    Version() int
    Payload() interface{}
    Metadata() EventMetadata
}

type BaseEvent struct {
    ID            string        `json:"eventId"`
    Type          string        `json:"eventType"`
    AggID         string        `json:"aggregateId"`
    AggType       string        `json:"aggregateType"`
    Ver           int           `json:"version"`
    Data          interface{}   `json:"payload"`
    Meta          EventMetadata `json:"metadata"`
}

func (e BaseEvent) EventType() string      { return e.Type }
func (e BaseEvent) AggregateID() string    { return e.AggID }
func (e BaseEvent) AggregateType() string  { return e.AggType }
func (e BaseEvent) Version() int           { return e.Ver }
func (e BaseEvent) Payload() interface{}   { return e.Data }
func (e BaseEvent) Metadata() EventMetadata { return e.Meta }

func NewEvent(
    eventType string,
    aggregateID string,
    aggregateType string,
    version int,
    payload interface{},
) BaseEvent {
    return BaseEvent{
        ID:      uuid.New().String(),
        Type:    eventType,
        AggID:   aggregateID,
        AggType: aggregateType,
        Ver:     version,
        Data:    payload,
        Meta:    NewMetadata(),
    }
}

// Serialización
func (e BaseEvent) MarshalPayload() ([]byte, error) {
    return json.Marshal(e.Data)
}

func (e BaseEvent) MarshalMetadata() ([]byte, error) {
    return json.Marshal(e.Meta)
}

Value Objects

Los Value Objects son objetos inmutables que se definen por sus atributos, no por una identidad. En Go, usamos structs con validación en el constructor. La inmutabilidad se logra devolviendo nuevas instancias en lugar de modificar la existente.

// internal/domain/valueobjects/money.go
package valueobjects

import "errors"

type Money struct {
    Amount   float64 `json:"amount" validate:"gte=0"`
    Currency string  `json:"currency" validate:"len=3"`
}

func NewMoney(amount float64, currency string) (Money, error) {
    if amount < 0 {
        return Money{}, errors.New("amount cannot be negative")
    }
    if len(currency) != 3 {
        return Money{}, errors.New("currency must be 3 characters")
    }
    return Money{Amount: amount, Currency: currency}, nil
}

func (m Money) Add(other Money) (Money, error) {
    if m.Currency != other.Currency {
        return Money{}, errors.New("cannot add different currencies")
    }
    return Money{Amount: m.Amount + other.Amount, Currency: m.Currency}, nil
}

func (m Money) Multiply(factor float64) Money {
    return Money{Amount: m.Amount * factor, Currency: m.Currency}
}

// internal/domain/valueobjects/address.go
package valueobjects

type Address struct {
    Street  string `json:"street" validate:"required"`
    City    string `json:"city" validate:"required"`
    State   string `json:"state" validate:"required"`
    ZipCode string `json:"zipCode" validate:"required"`
    Country string `json:"country" validate:"required,len=2"`
}

func NewAddress(street, city, state, zipCode, country string) (Address, error) {
    addr := Address{
        Street:  street,
        City:    city,
        State:   state,
        ZipCode: zipCode,
        Country: country,
    }
    // Validación básica
    if street == "" || city == "" || state == "" || zipCode == "" {
        return Address{}, errors.New("all address fields are required")
    }
    return addr, nil
}

Eventos de Order

// internal/domain/events/order_events.go
package events

import (
    "time"
    vo "github.com/siemprelisto/orderflow-es-go/internal/domain/valueobjects"
)

// OrderCreated
type OrderCreatedPayload struct {
    CustomerID      string        `json:"customerId"`
    CustomerEmail   string        `json:"customerEmail"`
    Items           []OrderItem   `json:"items"`
    ShippingAddress vo.Address    `json:"shippingAddress"`
    Subtotal        vo.Money      `json:"subtotal"`
    Tax             vo.Money      `json:"tax"`
    Total           vo.Money      `json:"total"`
}

type OrderItem struct {
    ProductID   string   `json:"productId"`
    ProductName string   `json:"productName"`
    SKU         string   `json:"sku"`
    Quantity    int      `json:"quantity"`
    UnitPrice   vo.Money `json:"unitPrice"`
}

func NewOrderCreated(orderID string, version int, payload OrderCreatedPayload) BaseEvent {
    return NewEvent("OrderCreated", orderID, "Order", version, payload)
}

// OrderItemAdded
type OrderItemAddedPayload struct {
    Item        OrderItem `json:"item"`
    NewSubtotal vo.Money  `json:"newSubtotal"`
    NewTotal    vo.Money  `json:"newTotal"`
}

func NewOrderItemAdded(orderID string, version int, payload OrderItemAddedPayload) BaseEvent {
    return NewEvent("OrderItemAdded", orderID, "Order", version, payload)
}

// OrderConfirmed
type OrderConfirmedPayload struct {
    ConfirmedAt       time.Time `json:"confirmedAt"`
    EstimatedDelivery time.Time `json:"estimatedDelivery"`
}

func NewOrderConfirmed(orderID string, version int, payload OrderConfirmedPayload) BaseEvent {
    return NewEvent("OrderConfirmed", orderID, "Order", version, payload)
}

// PaymentReceived
type PaymentReceivedPayload struct {
    PaymentID     string   `json:"paymentId"`
    Amount        vo.Money `json:"amount"`
    Method        string   `json:"method"`
    TransactionID string   `json:"transactionId"`
    PaidAt        time.Time `json:"paidAt"`
}

// OrderShipped
type OrderShippedPayload struct {
    TrackingNumber    string    `json:"trackingNumber"`
    Carrier           string    `json:"carrier"`
    ShippedAt         time.Time `json:"shippedAt"`
    EstimatedDelivery time.Time `json:"estimatedDelivery"`
}

// OrderCancelled
type OrderCancelledPayload struct {
    Reason         string    `json:"reason"`
    CancelledBy    string    `json:"cancelledBy"`
    CancelledAt    time.Time `json:"cancelledAt"`
    RefundRequired bool      `json:"refundRequired"`
}

Event Store Interface

En Go, las interfaces se definen implícitamente: cualquier tipo que implemente todos los métodos de una interfaz la satisface automáticamente. Esto permite crear puertos (interfaces) en el dominio que la infraestructura implementa sin dependencias directas.

El EventStore es el puerto central para persistir y recuperar eventos. Define las operaciones fundamentales de append-only.

// internal/infrastructure/eventstore/interface.go
package eventstore

import (
    "context"
    "time"

    "github.com/siemprelisto/orderflow-es-go/internal/domain/events"
)

type StoredEvent struct {
    GlobalPosition int64
    StreamID       string
    StreamPosition int
    EventType      string
    Data           []byte
    Metadata       []byte
    CreatedAt      time.Time
}

type AppendResult struct {
    NextExpectedVersion int
    GlobalPosition      int64
    EventsAppended      int
}

type ConcurrencyError struct {
    StreamID        string
    ExpectedVersion int
    ActualVersion   int
}

func (e ConcurrencyError) Error() string {
    return fmt.Sprintf(
        "concurrency conflict on stream %s: expected %d, actual %d",
        e.StreamID, e.ExpectedVersion, e.ActualVersion,
    )
}

type EventStore interface {
    Append(
        ctx context.Context,
        streamID string,
        events []events.DomainEvent,
        expectedVersion int,
    ) (AppendResult, error)

    ReadStream(
        ctx context.Context,
        streamID string,
    ) ([]StoredEvent, error)

    ReadStreamFrom(
        ctx context.Context,
        streamID string,
        fromVersion int,
    ) ([]StoredEvent, error)

    ReadAll(
        ctx context.Context,
        fromPosition int64,
        limit int,
    ) ([]StoredEvent, error)

    GetStreamVersion(
        ctx context.Context,
        streamID string,
    ) (int, error)
}

Makefile

# Makefile
.PHONY: build run test lint migrate

build:
	go build -o bin/api ./cmd/api

run:
	go run ./cmd/api

test:
	go test -v ./...

test-coverage:
	go test -coverprofile=coverage.out ./...
	go tool cover -html=coverage.out

lint:
	golangci-lint run

migrate-up:
	migrate -path ./migrations -database "postgres://localhost:5432/orderflow?sslmode=disable" up

migrate-down:
	migrate -path ./migrations -database "postgres://localhost:5432/orderflow?sslmode=disable" down

docker-up:
	docker-compose up -d

docker-down:
	docker-compose down

Main Entry Point

El punto de entrada configura dependencias y arranca el servidor. Implementamos graceful shutdown para cerrar conexiones limpiamente cuando el proceso recibe señales del sistema operativo (SIGINT, SIGTERM).

// cmd/api/main.go
package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/go-chi/chi/v5"
    "github.com/go-chi/chi/v5/middleware"
)

func main() {
    r := chi.NewRouter()

    // Middleware
    r.Use(middleware.Logger)
    r.Use(middleware.Recoverer)
    r.Use(middleware.Timeout(30 * time.Second))

    // Health check
    r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte(`{"status":"ok"}`))
    })

    // Server
    srv := &http.Server{
        Addr:    ":3000",
        Handler: r,
    }

    // Graceful shutdown
    go func() {
        sigCh := make(chan os.Signal, 1)
        signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
        <-sigCh

        ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
        defer cancel()

        if err := srv.Shutdown(ctx); err != nil {
            log.Printf("Server shutdown error: %v", err)
        }
    }()

    log.Println("Server starting on :3000")
    if err := srv.ListenAndServe(); err != http.ErrServerClosed {
        log.Fatalf("Server error: %v", err)
    }
}

Resumen

Glosario

Módulo Go (go.mod)

Definición: Unidad de distribución de código en Go que agrupa paquetes relacionados y define sus dependencias.

Por qué es importante: Permite gestionar versiones de dependencias de forma reproducible y aísla tu código de conflictos con otros proyectos.

Ejemplo práctico: go mod init github.com/mi-org/mi-proyecto crea un módulo. Luego go get agrega dependencias al archivo go.mod automáticamente.


Goroutine

Definición: Hilo ligero gestionado por el runtime de Go, no por el sistema operativo. Consume aproximadamente 2KB de stack inicial.

Por qué es importante: Permite manejar miles de operaciones concurrentes (como procesar eventos) con bajo overhead de memoria.

Ejemplo práctico: go func() { procesarEvento(e) }() ejecuta el procesamiento en paralelo sin bloquear el hilo principal.


Interface en Go

Definición: Contrato que define un conjunto de métodos sin implementación. En Go, las interfaces se satisfacen implícitamente.

Por qué es importante: Permite crear abstracciones (puertos) que desacoplan el dominio de la infraestructura, facilitando testing y cambio de implementaciones.

Ejemplo práctico: EventStore es una interfaz; PostgresEventStore y InMemoryEventStore son implementaciones que la satisfacen automáticamente.


Carpeta internal/

Definición: Directorio especial en Go cuyo código no puede ser importado por módulos externos.

Por qué es importante: Garantiza encapsulación a nivel de compilador, protegiendo la lógica interna del proyecto de uso externo no autorizado.

Ejemplo práctico: El código en internal/domain/ solo es accesible desde dentro de tu módulo, no desde otros proyectos que lo importen.


Graceful Shutdown

Definición: Proceso de cerrar una aplicación de manera ordenada, completando operaciones en curso antes de terminar.

Por qué es importante: Evita pérdida de datos y conexiones huérfanas cuando el servidor se reinicia o escala.

Ejemplo práctico: Al recibir SIGTERM, el servidor deja de aceptar nuevas conexiones pero espera hasta 10 segundos para que las existentes terminen.


Chi Router

Definición: Enrutador HTTP ligero y composable para Go, compatible con el estándar net/http.

Por qué es importante: Ofrece middleware, agrupación de rutas y parámetros URL sin dependencias pesadas ni magia.

Ejemplo práctico: r.Get("/orders/{id}", handler) define una ruta con parámetro dinámico que Chi extrae automáticamente.


← Capítulo 10: Proyecciones Tiempo Real | Capítulo 12: Event Store con MongoDB en Go →