Capítulo 11: Event Sourcing en Go - Setup
Capítulo 11: Event Sourcing en Go - Setup
“Go ofrece simplicidad y rendimiento para sistemas event-sourced”
Por Qué Go para Event Sourcing
Go es un lenguaje compilado desarrollado por Google que destaca por su simplicidad, rendimiento y excelente soporte para concurrencia. Estas características lo hacen ideal para sistemas event-sourced donde necesitamos:
- Alta concurrencia: Go maneja miles de goroutines (hilos ligeros) de manera eficiente
- Tipado estático fuerte: Los eventos y agregados se definen con tipos claros que el compilador verifica
- Bajo consumo de memoria: Ideal para workers que procesan streams de eventos
- Compilación rápida: Ciclos de desarrollo ágiles
En este capítulo configuraremos un proyecto Go con arquitectura hexagonal, preparando la base para implementar Event Sourcing.
Inicialización del Proyecto
# Crear directorio
mkdir orderflow-es-go && cd orderflow-es-go
# Inicializar módulo Go
# El comando 'go mod init' crea un archivo go.mod que define el módulo
# y sus dependencias. El nombre sigue la convención de ruta del repositorio.
go mod init github.com/siemprelisto/orderflow-es-go
# Instalar dependencias
# uuid: genera identificadores únicos para eventos y agregados
go get github.com/google/uuid
# pgx: driver PostgreSQL de alto rendimiento (mejor que database/sql)
go get github.com/jackc/pgx/v5
# chi: router HTTP ligero y composable
go get github.com/go-chi/chi/v5
# validator: validación de structs mediante tags
go get github.com/go-playground/validator/v10
Estructura del Proyecto
La estructura sigue la convención estándar de Go y los principios de arquitectura hexagonal:
orderflow-es-go/
├── cmd/ # Puntos de entrada de la aplicación
│ └── api/
│ └── main.go # Servidor HTTP principal
├── internal/ # Código privado del módulo (no exportable)
│ ├── domain/ # Capa de dominio (núcleo del negocio)
│ │ ├── events/ # Definición de eventos de dominio
│ │ │ ├── base.go # Evento base e interfaz DomainEvent
│ │ │ └── order_events.go # Eventos específicos de Order
│ │ ├── aggregates/ # Agregados (raíces de consistencia)
│ │ │ └── order/
│ │ │ ├── order.go # Lógica del agregado Order
│ │ │ └── order_test.go
│ │ └── valueobjects/ # Value Objects inmutables
│ │ ├── address.go
│ │ └── money.go
│ ├── application/ # Capa de aplicación (casos de uso)
│ │ ├── commands/ # Comandos (intención de modificar)
│ │ │ └── create_order.go
│ │ ├── queries/ # Consultas (solo lectura)
│ │ │ └── get_order.go
│ │ └── projections/ # Proyecciones (read models)
│ │ └── orders_projection.go
│ └── infrastructure/ # Capa de infraestructura (adaptadores)
│ ├── eventstore/ # Implementaciones del Event Store
│ │ ├── interface.go # Puerto (interfaz abstracta)
│ │ ├── postgres.go # Adaptador PostgreSQL
│ │ └── inmemory.go # Adaptador en memoria (testing)
│ ├── repository/
│ │ └── order_repository.go
│ └── database/
│ └── postgres.go
├── pkg/ # Código público reutilizable
│ └── errors/
│ └── errors.go
├── go.mod # Definición del módulo y dependencias
├── go.sum # Checksums de dependencias
└── Makefile # Automatización de tareas
La carpeta internal/ es especial en Go: el compilador impide que otros módulos importen paquetes desde ella, garantizando encapsulación.
Eventos Base
Los eventos de dominio son el corazón de Event Sourcing. En Go, usamos interfaces para definir contratos y structs para implementaciones concretas. La interfaz DomainEvent define qué información debe tener cualquier evento del sistema.
// internal/domain/events/base.go
package events
import (
"encoding/json"
"time"
"github.com/google/uuid"
)
type EventMetadata struct {
CorrelationID string `json:"correlationId"`
CausationID string `json:"causationId"`
UserID string `json:"userId,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
func NewMetadata() EventMetadata {
id := uuid.New().String()
return EventMetadata{
CorrelationID: id,
CausationID: id,
Timestamp: time.Now().UTC(),
}
}
type DomainEvent interface {
EventType() string
AggregateID() string
AggregateType() string
Version() int
Payload() interface{}
Metadata() EventMetadata
}
type BaseEvent struct {
ID string `json:"eventId"`
Type string `json:"eventType"`
AggID string `json:"aggregateId"`
AggType string `json:"aggregateType"`
Ver int `json:"version"`
Data interface{} `json:"payload"`
Meta EventMetadata `json:"metadata"`
}
func (e BaseEvent) EventType() string { return e.Type }
func (e BaseEvent) AggregateID() string { return e.AggID }
func (e BaseEvent) AggregateType() string { return e.AggType }
func (e BaseEvent) Version() int { return e.Ver }
func (e BaseEvent) Payload() interface{} { return e.Data }
func (e BaseEvent) Metadata() EventMetadata { return e.Meta }
func NewEvent(
eventType string,
aggregateID string,
aggregateType string,
version int,
payload interface{},
) BaseEvent {
return BaseEvent{
ID: uuid.New().String(),
Type: eventType,
AggID: aggregateID,
AggType: aggregateType,
Ver: version,
Data: payload,
Meta: NewMetadata(),
}
}
// Serialización
func (e BaseEvent) MarshalPayload() ([]byte, error) {
return json.Marshal(e.Data)
}
func (e BaseEvent) MarshalMetadata() ([]byte, error) {
return json.Marshal(e.Meta)
}
Value Objects
Los Value Objects son objetos inmutables que se definen por sus atributos, no por una identidad. En Go, usamos structs con validación en el constructor. La inmutabilidad se logra devolviendo nuevas instancias en lugar de modificar la existente.
// internal/domain/valueobjects/money.go
package valueobjects
import "errors"
type Money struct {
Amount float64 `json:"amount" validate:"gte=0"`
Currency string `json:"currency" validate:"len=3"`
}
func NewMoney(amount float64, currency string) (Money, error) {
if amount < 0 {
return Money{}, errors.New("amount cannot be negative")
}
if len(currency) != 3 {
return Money{}, errors.New("currency must be 3 characters")
}
return Money{Amount: amount, Currency: currency}, nil
}
func (m Money) Add(other Money) (Money, error) {
if m.Currency != other.Currency {
return Money{}, errors.New("cannot add different currencies")
}
return Money{Amount: m.Amount + other.Amount, Currency: m.Currency}, nil
}
func (m Money) Multiply(factor float64) Money {
return Money{Amount: m.Amount * factor, Currency: m.Currency}
}
// internal/domain/valueobjects/address.go
package valueobjects
type Address struct {
Street string `json:"street" validate:"required"`
City string `json:"city" validate:"required"`
State string `json:"state" validate:"required"`
ZipCode string `json:"zipCode" validate:"required"`
Country string `json:"country" validate:"required,len=2"`
}
func NewAddress(street, city, state, zipCode, country string) (Address, error) {
addr := Address{
Street: street,
City: city,
State: state,
ZipCode: zipCode,
Country: country,
}
// Validación básica
if street == "" || city == "" || state == "" || zipCode == "" {
return Address{}, errors.New("all address fields are required")
}
return addr, nil
}
Eventos de Order
// internal/domain/events/order_events.go
package events
import (
"time"
vo "github.com/siemprelisto/orderflow-es-go/internal/domain/valueobjects"
)
// OrderCreated
type OrderCreatedPayload struct {
CustomerID string `json:"customerId"`
CustomerEmail string `json:"customerEmail"`
Items []OrderItem `json:"items"`
ShippingAddress vo.Address `json:"shippingAddress"`
Subtotal vo.Money `json:"subtotal"`
Tax vo.Money `json:"tax"`
Total vo.Money `json:"total"`
}
type OrderItem struct {
ProductID string `json:"productId"`
ProductName string `json:"productName"`
SKU string `json:"sku"`
Quantity int `json:"quantity"`
UnitPrice vo.Money `json:"unitPrice"`
}
func NewOrderCreated(orderID string, version int, payload OrderCreatedPayload) BaseEvent {
return NewEvent("OrderCreated", orderID, "Order", version, payload)
}
// OrderItemAdded
type OrderItemAddedPayload struct {
Item OrderItem `json:"item"`
NewSubtotal vo.Money `json:"newSubtotal"`
NewTotal vo.Money `json:"newTotal"`
}
func NewOrderItemAdded(orderID string, version int, payload OrderItemAddedPayload) BaseEvent {
return NewEvent("OrderItemAdded", orderID, "Order", version, payload)
}
// OrderConfirmed
type OrderConfirmedPayload struct {
ConfirmedAt time.Time `json:"confirmedAt"`
EstimatedDelivery time.Time `json:"estimatedDelivery"`
}
func NewOrderConfirmed(orderID string, version int, payload OrderConfirmedPayload) BaseEvent {
return NewEvent("OrderConfirmed", orderID, "Order", version, payload)
}
// PaymentReceived
type PaymentReceivedPayload struct {
PaymentID string `json:"paymentId"`
Amount vo.Money `json:"amount"`
Method string `json:"method"`
TransactionID string `json:"transactionId"`
PaidAt time.Time `json:"paidAt"`
}
// OrderShipped
type OrderShippedPayload struct {
TrackingNumber string `json:"trackingNumber"`
Carrier string `json:"carrier"`
ShippedAt time.Time `json:"shippedAt"`
EstimatedDelivery time.Time `json:"estimatedDelivery"`
}
// OrderCancelled
type OrderCancelledPayload struct {
Reason string `json:"reason"`
CancelledBy string `json:"cancelledBy"`
CancelledAt time.Time `json:"cancelledAt"`
RefundRequired bool `json:"refundRequired"`
}
Event Store Interface
En Go, las interfaces se definen implícitamente: cualquier tipo que implemente todos los métodos de una interfaz la satisface automáticamente. Esto permite crear puertos (interfaces) en el dominio que la infraestructura implementa sin dependencias directas.
El EventStore es el puerto central para persistir y recuperar eventos. Define las operaciones fundamentales de append-only.
// internal/infrastructure/eventstore/interface.go
package eventstore
import (
"context"
"time"
"github.com/siemprelisto/orderflow-es-go/internal/domain/events"
)
type StoredEvent struct {
GlobalPosition int64
StreamID string
StreamPosition int
EventType string
Data []byte
Metadata []byte
CreatedAt time.Time
}
type AppendResult struct {
NextExpectedVersion int
GlobalPosition int64
EventsAppended int
}
type ConcurrencyError struct {
StreamID string
ExpectedVersion int
ActualVersion int
}
func (e ConcurrencyError) Error() string {
return fmt.Sprintf(
"concurrency conflict on stream %s: expected %d, actual %d",
e.StreamID, e.ExpectedVersion, e.ActualVersion,
)
}
type EventStore interface {
Append(
ctx context.Context,
streamID string,
events []events.DomainEvent,
expectedVersion int,
) (AppendResult, error)
ReadStream(
ctx context.Context,
streamID string,
) ([]StoredEvent, error)
ReadStreamFrom(
ctx context.Context,
streamID string,
fromVersion int,
) ([]StoredEvent, error)
ReadAll(
ctx context.Context,
fromPosition int64,
limit int,
) ([]StoredEvent, error)
GetStreamVersion(
ctx context.Context,
streamID string,
) (int, error)
}
Makefile
# Makefile
.PHONY: build run test lint migrate
build:
go build -o bin/api ./cmd/api
run:
go run ./cmd/api
test:
go test -v ./...
test-coverage:
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
lint:
golangci-lint run
migrate-up:
migrate -path ./migrations -database "postgres://localhost:5432/orderflow?sslmode=disable" up
migrate-down:
migrate -path ./migrations -database "postgres://localhost:5432/orderflow?sslmode=disable" down
docker-up:
docker-compose up -d
docker-down:
docker-compose down
Main Entry Point
El punto de entrada configura dependencias y arranca el servidor. Implementamos graceful shutdown para cerrar conexiones limpiamente cuando el proceso recibe señales del sistema operativo (SIGINT, SIGTERM).
// cmd/api/main.go
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
func main() {
r := chi.NewRouter()
// Middleware
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.Use(middleware.Timeout(30 * time.Second))
// Health check
r.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"ok"}`))
})
// Server
srv := &http.Server{
Addr: ":3000",
Handler: r,
}
// Graceful shutdown
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Printf("Server shutdown error: %v", err)
}
}()
log.Println("Server starting on :3000")
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}
Resumen
- Estructura hexagonal con separación clara de capas
- Value Objects inmutables para el dominio
- Eventos tipados con payloads específicos
- Interface del Event Store para múltiples implementaciones
- Chi como router HTTP ligero
Glosario
Módulo Go (go.mod)
Definición: Unidad de distribución de código en Go que agrupa paquetes relacionados y define sus dependencias.
Por qué es importante: Permite gestionar versiones de dependencias de forma reproducible y aísla tu código de conflictos con otros proyectos.
Ejemplo práctico: go mod init github.com/mi-org/mi-proyecto crea un módulo. Luego go get agrega dependencias al archivo go.mod automáticamente.
Goroutine
Definición: Hilo ligero gestionado por el runtime de Go, no por el sistema operativo. Consume aproximadamente 2KB de stack inicial.
Por qué es importante: Permite manejar miles de operaciones concurrentes (como procesar eventos) con bajo overhead de memoria.
Ejemplo práctico: go func() { procesarEvento(e) }() ejecuta el procesamiento en paralelo sin bloquear el hilo principal.
Interface en Go
Definición: Contrato que define un conjunto de métodos sin implementación. En Go, las interfaces se satisfacen implícitamente.
Por qué es importante: Permite crear abstracciones (puertos) que desacoplan el dominio de la infraestructura, facilitando testing y cambio de implementaciones.
Ejemplo práctico: EventStore es una interfaz; PostgresEventStore y InMemoryEventStore son implementaciones que la satisfacen automáticamente.
Carpeta internal/
Definición: Directorio especial en Go cuyo código no puede ser importado por módulos externos.
Por qué es importante: Garantiza encapsulación a nivel de compilador, protegiendo la lógica interna del proyecto de uso externo no autorizado.
Ejemplo práctico: El código en internal/domain/ solo es accesible desde dentro de tu módulo, no desde otros proyectos que lo importen.
Graceful Shutdown
Definición: Proceso de cerrar una aplicación de manera ordenada, completando operaciones en curso antes de terminar.
Por qué es importante: Evita pérdida de datos y conexiones huérfanas cuando el servidor se reinicia o escala.
Ejemplo práctico: Al recibir SIGTERM, el servidor deja de aceptar nuevas conexiones pero espera hasta 10 segundos para que las existentes terminen.
Chi Router
Definición: Enrutador HTTP ligero y composable para Go, compatible con el estándar net/http.
Por qué es importante: Ofrece middleware, agrupación de rutas y parámetros URL sin dependencias pesadas ni magia.
Ejemplo práctico: r.Get("/orders/{id}", handler) define una ruta con parámetro dinámico que Chi extrae automáticamente.
← Capítulo 10: Proyecciones Tiempo Real | Capítulo 12: Event Store con MongoDB en Go →