Capítulo 14: Sagas en Go con Temporal.io
Capítulo 14: Sagas en Go con Temporal.io
“Temporal convierte flujos distribuidos complejos en codigo simple”
Introduccion
Hasta ahora hemos construido nuestra propia infraestructura para sagas: persistencia, reintentos, compensaciones. Pero existe una alternativa: usar una plataforma de orquestacion que maneja todo esto automaticamente.
Temporal.io es exactamente eso: una plataforma open-source disenada especificamente para ejecutar flujos de trabajo duraderos y distribuidos. En lugar de escribir codigo para persistir estado, manejar reintentos y coordinar compensaciones, Temporal lo hace por nosotros.
Go (o Golang) es un lenguaje de programacion creado por Google, conocido por su simplicidad, rendimiento y excelente soporte para concurrencia. Es muy popular en sistemas distribuidos y microservicios.
Que es Temporal.io
Temporal es una plataforma de orquestacion que:
- Persiste automáticamente el estado de workflows
- Reintenta automáticamente operaciones fallidas
- Resume workflows tras reinicios del servidor
- Maneja compensaciones de forma nativa
Arquitectura Temporal
La arquitectura de Temporal separa responsabilidades claramente:
- Temporal Server: El cerebro que persiste estado y coordina la ejecucion
- Workers: Procesos que ejecutan la logica de negocio real
- Cliente: Inicia workflows y consulta su estado
Los workers se conectan al servidor y “escuchan” tareas. Cuando hay trabajo, el servidor asigna tareas a workers disponibles. Si un worker falla, el servidor reasigna la tarea a otro.
graph LR
C[Cliente] --> TS[Temporal Server]
TS --> W1[Worker 1]
TS --> W2[Worker 2]
W1 --> S1[Order Service]
W2 --> S2[Payment Service]
Configuracion del Proyecto
Primero configuramos un proyecto Go con el SDK de Temporal. El comando go mod init crea un modulo Go, y go get descarga la dependencia de Temporal.
Un Task Queue es una cola nombrada donde el servidor coloca tareas y los workers las recogen. Multiples workers pueden escuchar la misma cola para escalabilidad.
# Instalar dependencias
go mod init orderflow
go get go.temporal.io/sdk
// main.go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
func main() {
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
w := worker.New(c, "order-task-queue", worker.Options{})
w.RegisterWorkflow(CreateOrderWorkflow)
w.RegisterActivity(&OrderActivities{})
w.RegisterActivity(&InventoryActivities{})
w.RegisterActivity(&PaymentActivities{})
if err := w.Run(worker.InterruptCh()); err != nil {
log.Fatalln("Unable to start worker", err)
}
}
Definicion de Activities
En Temporal, una Activity es una unidad de trabajo que puede tener efectos secundarios: llamar APIs externas, escribir en base de datos, enviar emails. Las activities pueden fallar y Temporal las reintentara automaticamente segun la politica configurada.
Las activities se agrupan en structs para organizacion. Cada activity es un metodo que recibe un context (para cancelacion y timeouts) y retorna un resultado o error.
// activities.go
package main
import (
"context"
"errors"
)
type OrderActivities struct{}
type InventoryActivities struct{}
type PaymentActivities struct{}
type Order struct {
ID string
CustomerID string
Items []OrderItem
Total float64
Status string
}
type OrderItem struct {
ProductID string
Quantity int
Price float64
}
// Order Activities
func (a *OrderActivities) CreateOrder(ctx context.Context, input CreateOrderInput) (*Order, error) {
order := &Order{
ID: generateID(),
CustomerID: input.CustomerID,
Items: input.Items,
Total: input.Total,
Status: "pending",
}
// Guardar en DB
return order, nil
}
func (a *OrderActivities) CancelOrder(ctx context.Context, orderID string) error {
// Actualizar estado a "cancelled"
return nil
}
func (a *OrderActivities) CompleteOrder(ctx context.Context, orderID string) error {
// Actualizar estado a "completed"
return nil
}
// Inventory Activities
func (a *InventoryActivities) ReserveStock(ctx context.Context, orderID string, items []OrderItem) (string, error) {
for _, item := range items {
available, _ := checkStock(item.ProductID)
if available < item.Quantity {
return "", errors.New("insufficient stock")
}
}
reservationID := generateID()
// Crear reserva en DB
return reservationID, nil
}
func (a *InventoryActivities) ReleaseStock(ctx context.Context, reservationID string) error {
// Liberar reserva
return nil
}
// Payment Activities
func (a *PaymentActivities) ProcessPayment(ctx context.Context, orderID string, amount float64) (string, error) {
// Llamar a pasarela de pago
paymentID := generateID()
return paymentID, nil
}
func (a *PaymentActivities) RefundPayment(ctx context.Context, paymentID string) error {
// Procesar reembolso
return nil
}
Workflow con Saga
Un Workflow en Temporal es el orquestador que coordina activities. A diferencia de las activities, los workflows deben ser deterministicos: dado el mismo input, siempre producen la misma secuencia de comandos.
La Retry Policy (politica de reintentos) define como Temporal reintenta activities fallidas:
InitialInterval: Tiempo antes del primer reintentoBackoffCoefficient: Multiplicador para aumentar el intervalo entre reintentosMaximumAttempts: Numero maximo de intentos
El array compensations acumula funciones de compensacion. Si cualquier paso falla, ejecutamos todas las compensaciones en orden inverso (LIFO - Last In, First Out).
// workflow.go
package main
import (
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
type CreateOrderInput struct {
CustomerID string
Items []OrderItem
Total float64
}
type CreateOrderResult struct {
OrderID string
PaymentID string
Status string
}
func CreateOrderWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, options)
var orderAct *OrderActivities
var inventoryAct *InventoryActivities
var paymentAct *PaymentActivities
// Saga: track de compensaciones
var compensations []func(ctx workflow.Context) error
// Paso 1: Crear orden
var order *Order
err := workflow.ExecuteActivity(ctx, orderAct.CreateOrder, input).Get(ctx, &order)
if err != nil {
return nil, err
}
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(ctx, orderAct.CancelOrder, order.ID).Get(ctx, nil)
})
// Paso 2: Reservar stock
var reservationID string
err = workflow.ExecuteActivity(ctx, inventoryAct.ReserveStock, order.ID, input.Items).Get(ctx, &reservationID)
if err != nil {
runCompensations(ctx, compensations)
return nil, err
}
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(ctx, inventoryAct.ReleaseStock, reservationID).Get(ctx, nil)
})
// Paso 3: Procesar pago
var paymentID string
err = workflow.ExecuteActivity(ctx, paymentAct.ProcessPayment, order.ID, input.Total).Get(ctx, &paymentID)
if err != nil {
runCompensations(ctx, compensations)
return nil, err
}
// Paso 4: Completar orden
err = workflow.ExecuteActivity(ctx, orderAct.CompleteOrder, order.ID).Get(ctx, nil)
if err != nil {
// Agregar refund a compensaciones antes de ejecutar
compensations = append(compensations, func(ctx workflow.Context) error {
return workflow.ExecuteActivity(ctx, paymentAct.RefundPayment, paymentID).Get(ctx, nil)
})
runCompensations(ctx, compensations)
return nil, err
}
return &CreateOrderResult{
OrderID: order.ID,
PaymentID: paymentID,
Status: "completed",
}, nil
}
func runCompensations(ctx workflow.Context, compensations []func(workflow.Context) error) {
for i := len(compensations) - 1; i >= 0; i-- {
if err := compensations[i](ctx); err != nil {
workflow.GetLogger(ctx).Error("Compensation failed", "error", err)
}
}
}
Cliente para Iniciar Workflows
El cliente de Temporal permite iniciar workflows y consultar su estado. ExecuteWorkflow envia la solicitud al servidor Temporal y retorna un “handle” que permite esperar el resultado o consultar el progreso.
StartWorkflowOptions configura:
ID: Identificador unico del workflow (util para idempotencia)TaskQueue: La cola donde el servidor colocara las tareas
// client.go
package main
import (
"context"
"log"
"go.temporal.io/sdk/client"
)
func startOrder(input CreateOrderInput) (*CreateOrderResult, error) {
c, err := client.Dial(client.Options{})
if err != nil {
return nil, err
}
defer c.Close()
options := client.StartWorkflowOptions{
ID: "order-" + generateID(),
TaskQueue: "order-task-queue",
}
we, err := c.ExecuteWorkflow(context.Background(), options, CreateOrderWorkflow, input)
if err != nil {
return nil, err
}
log.Printf("Started workflow: %s, RunID: %s\n", we.GetID(), we.GetRunID())
var result CreateOrderResult
err = we.Get(context.Background(), &result)
return &result, err
}
Monitoreo con Temporal UI
Temporal incluye una UI web que proporciona visibilidad completa sobre los workflows. Puedes ver el historial de eventos de cada workflow, incluyendo que activities se ejecutaron, cuanto tardaron, y si hubo reintentos o errores.
Funcionalidades principales:
- Ver estado de workflows (running, completed, failed)
- Inspeccionar historial de eventos paso a paso
- Reintentar workflows fallidos manualmente
- Cancelar workflows en progreso
Resumen
- Temporal.io simplifica la implementacion de sagas
- El estado se persiste automaticamente
- Los reintentos son configurables por activity
- Las compensaciones se ejecutan en orden inverso
- La UI web facilita monitoreo y debugging
Glosario
Temporal.io
Definicion: Plataforma open-source de orquestacion de workflows que proporciona durabilidad, reintentos y persistencia de estado automaticos para flujos de trabajo distribuidos.
Por que es importante: Elimina la necesidad de construir infraestructura propia para sagas, reduciendo codigo y complejidad mientras garantiza confiabilidad.
Ejemplo practico: En lugar de escribir 200 lineas de codigo para persistencia y recovery, Temporal lo maneja automaticamente mientras tu solo escribes la logica de negocio.
Activity (Temporal)
Definicion: Unidad de trabajo en Temporal que puede tener efectos secundarios como llamadas a APIs, operaciones de base de datos o envio de notificaciones.
Por que es importante: Las activities encapsulan operaciones que pueden fallar. Temporal las reintenta automaticamente y rastrea su estado, haciendo el sistema mas resiliente.
Ejemplo practico: ProcessPayment es una activity que llama a Stripe. Si falla por timeout, Temporal la reintenta hasta 3 veces con backoff exponencial.
Workflow (Temporal)
Definicion: Funcion deterministica en Temporal que orquesta la ejecucion de activities y define la logica de flujo de un proceso de negocio.
Por que es importante: Los workflows son el corazon de Temporal. Su estado se persiste automaticamente, permitiendo recuperacion ante cualquier fallo.
Ejemplo practico: CreateOrderWorkflow coordina crear orden, reservar stock, procesar pago y confirmar envio, manejando compensaciones si algo falla.
Task Queue
Definicion: Cola nombrada en Temporal donde el servidor coloca tareas (workflows y activities) y los workers las recogen para ejecutar.
Por que es importante: Permite escalar horizontalmente agregando mas workers a la misma cola, distribuyendo la carga automaticamente.
Ejemplo practico: order-task-queue tiene 5 workers escuchando. Cuando llegan muchos pedidos, todos los workers procesan en paralelo.
Retry Policy
Definicion: Configuracion que define como Temporal reintenta activities fallidas, incluyendo intervalos, backoff y numero maximo de intentos.
Por que es importante: Permite recuperarse de fallos transitorios (timeouts de red, servicios ocupados) sin intervencion manual.
Ejemplo practico: Una policy con InitialInterval: 1s, BackoffCoefficient: 2.0 y MaximumAttempts: 5 reintenta a 1s, 2s, 4s, 8s antes de fallar definitivamente.
Determinismo (en Workflows)
Definicion: Propiedad que requiere que un workflow produzca siempre la misma secuencia de comandos dado el mismo input, sin depender de tiempo real, numeros aleatorios o I/O directo.
Por que es importante: Temporal “reproduce” workflows para reconstruir su estado. Si el codigo no es deterministico, la reproduccion puede dar resultados diferentes y corromper el estado.
Ejemplo practico: En lugar de time.Now() (no deterministico), usa workflow.Now(ctx) que retorna el tiempo del historial durante replay.
← Capítulo 13: Estado y Persistencia | Capítulo 15: Workflows y Activities →