← Volver al listado de tecnologías

Capítulo 14: Sagas en Go con Temporal.io

Por: SiempreListo
sagagotemporalworkflowsorquestación

Capítulo 14: Sagas en Go con Temporal.io

“Temporal convierte flujos distribuidos complejos en codigo simple”

Introduccion

Hasta ahora hemos construido nuestra propia infraestructura para sagas: persistencia, reintentos, compensaciones. Pero existe una alternativa: usar una plataforma de orquestacion que maneja todo esto automaticamente.

Temporal.io es exactamente eso: una plataforma open-source disenada especificamente para ejecutar flujos de trabajo duraderos y distribuidos. En lugar de escribir codigo para persistir estado, manejar reintentos y coordinar compensaciones, Temporal lo hace por nosotros.

Go (o Golang) es un lenguaje de programacion creado por Google, conocido por su simplicidad, rendimiento y excelente soporte para concurrencia. Es muy popular en sistemas distribuidos y microservicios.

Que es Temporal.io

Temporal es una plataforma de orquestacion que:

Arquitectura Temporal

La arquitectura de Temporal separa responsabilidades claramente:

Los workers se conectan al servidor y “escuchan” tareas. Cuando hay trabajo, el servidor asigna tareas a workers disponibles. Si un worker falla, el servidor reasigna la tarea a otro.

graph LR
    C[Cliente] --> TS[Temporal Server]
    TS --> W1[Worker 1]
    TS --> W2[Worker 2]
    W1 --> S1[Order Service]
    W2 --> S2[Payment Service]

Configuracion del Proyecto

Primero configuramos un proyecto Go con el SDK de Temporal. El comando go mod init crea un modulo Go, y go get descarga la dependencia de Temporal.

Un Task Queue es una cola nombrada donde el servidor coloca tareas y los workers las recogen. Multiples workers pueden escuchar la misma cola para escalabilidad.

# Instalar dependencias
go mod init orderflow
go get go.temporal.io/sdk
// main.go
package main

import (
    "log"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    c, err := client.Dial(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatalln("Unable to create client", err)
    }
    defer c.Close()

    w := worker.New(c, "order-task-queue", worker.Options{})

    w.RegisterWorkflow(CreateOrderWorkflow)
    w.RegisterActivity(&OrderActivities{})
    w.RegisterActivity(&InventoryActivities{})
    w.RegisterActivity(&PaymentActivities{})

    if err := w.Run(worker.InterruptCh()); err != nil {
        log.Fatalln("Unable to start worker", err)
    }
}

Definicion de Activities

En Temporal, una Activity es una unidad de trabajo que puede tener efectos secundarios: llamar APIs externas, escribir en base de datos, enviar emails. Las activities pueden fallar y Temporal las reintentara automaticamente segun la politica configurada.

Las activities se agrupan en structs para organizacion. Cada activity es un metodo que recibe un context (para cancelacion y timeouts) y retorna un resultado o error.

// activities.go
package main

import (
    "context"
    "errors"
)

type OrderActivities struct{}
type InventoryActivities struct{}
type PaymentActivities struct{}

type Order struct {
    ID         string
    CustomerID string
    Items      []OrderItem
    Total      float64
    Status     string
}

type OrderItem struct {
    ProductID string
    Quantity  int
    Price     float64
}

// Order Activities
func (a *OrderActivities) CreateOrder(ctx context.Context, input CreateOrderInput) (*Order, error) {
    order := &Order{
        ID:         generateID(),
        CustomerID: input.CustomerID,
        Items:      input.Items,
        Total:      input.Total,
        Status:     "pending",
    }
    // Guardar en DB
    return order, nil
}

func (a *OrderActivities) CancelOrder(ctx context.Context, orderID string) error {
    // Actualizar estado a "cancelled"
    return nil
}

func (a *OrderActivities) CompleteOrder(ctx context.Context, orderID string) error {
    // Actualizar estado a "completed"
    return nil
}

// Inventory Activities
func (a *InventoryActivities) ReserveStock(ctx context.Context, orderID string, items []OrderItem) (string, error) {
    for _, item := range items {
        available, _ := checkStock(item.ProductID)
        if available < item.Quantity {
            return "", errors.New("insufficient stock")
        }
    }
    reservationID := generateID()
    // Crear reserva en DB
    return reservationID, nil
}

func (a *InventoryActivities) ReleaseStock(ctx context.Context, reservationID string) error {
    // Liberar reserva
    return nil
}

// Payment Activities
func (a *PaymentActivities) ProcessPayment(ctx context.Context, orderID string, amount float64) (string, error) {
    // Llamar a pasarela de pago
    paymentID := generateID()
    return paymentID, nil
}

func (a *PaymentActivities) RefundPayment(ctx context.Context, paymentID string) error {
    // Procesar reembolso
    return nil
}

Workflow con Saga

Un Workflow en Temporal es el orquestador que coordina activities. A diferencia de las activities, los workflows deben ser deterministicos: dado el mismo input, siempre producen la misma secuencia de comandos.

La Retry Policy (politica de reintentos) define como Temporal reintenta activities fallidas:

El array compensations acumula funciones de compensacion. Si cualquier paso falla, ejecutamos todas las compensaciones en orden inverso (LIFO - Last In, First Out).

// workflow.go
package main

import (
    "time"
    "go.temporal.io/sdk/temporal"
    "go.temporal.io/sdk/workflow"
)

type CreateOrderInput struct {
    CustomerID string
    Items      []OrderItem
    Total      float64
}

type CreateOrderResult struct {
    OrderID   string
    PaymentID string
    Status    string
}

func CreateOrderWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
    options := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Minute,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, options)

    var orderAct *OrderActivities
    var inventoryAct *InventoryActivities
    var paymentAct *PaymentActivities

    // Saga: track de compensaciones
    var compensations []func(ctx workflow.Context) error

    // Paso 1: Crear orden
    var order *Order
    err := workflow.ExecuteActivity(ctx, orderAct.CreateOrder, input).Get(ctx, &order)
    if err != nil {
        return nil, err
    }
    compensations = append(compensations, func(ctx workflow.Context) error {
        return workflow.ExecuteActivity(ctx, orderAct.CancelOrder, order.ID).Get(ctx, nil)
    })

    // Paso 2: Reservar stock
    var reservationID string
    err = workflow.ExecuteActivity(ctx, inventoryAct.ReserveStock, order.ID, input.Items).Get(ctx, &reservationID)
    if err != nil {
        runCompensations(ctx, compensations)
        return nil, err
    }
    compensations = append(compensations, func(ctx workflow.Context) error {
        return workflow.ExecuteActivity(ctx, inventoryAct.ReleaseStock, reservationID).Get(ctx, nil)
    })

    // Paso 3: Procesar pago
    var paymentID string
    err = workflow.ExecuteActivity(ctx, paymentAct.ProcessPayment, order.ID, input.Total).Get(ctx, &paymentID)
    if err != nil {
        runCompensations(ctx, compensations)
        return nil, err
    }

    // Paso 4: Completar orden
    err = workflow.ExecuteActivity(ctx, orderAct.CompleteOrder, order.ID).Get(ctx, nil)
    if err != nil {
        // Agregar refund a compensaciones antes de ejecutar
        compensations = append(compensations, func(ctx workflow.Context) error {
            return workflow.ExecuteActivity(ctx, paymentAct.RefundPayment, paymentID).Get(ctx, nil)
        })
        runCompensations(ctx, compensations)
        return nil, err
    }

    return &CreateOrderResult{
        OrderID:   order.ID,
        PaymentID: paymentID,
        Status:    "completed",
    }, nil
}

func runCompensations(ctx workflow.Context, compensations []func(workflow.Context) error) {
    for i := len(compensations) - 1; i >= 0; i-- {
        if err := compensations[i](ctx); err != nil {
            workflow.GetLogger(ctx).Error("Compensation failed", "error", err)
        }
    }
}

Cliente para Iniciar Workflows

El cliente de Temporal permite iniciar workflows y consultar su estado. ExecuteWorkflow envia la solicitud al servidor Temporal y retorna un “handle” que permite esperar el resultado o consultar el progreso.

StartWorkflowOptions configura:

// client.go
package main

import (
    "context"
    "log"
    "go.temporal.io/sdk/client"
)

func startOrder(input CreateOrderInput) (*CreateOrderResult, error) {
    c, err := client.Dial(client.Options{})
    if err != nil {
        return nil, err
    }
    defer c.Close()

    options := client.StartWorkflowOptions{
        ID:        "order-" + generateID(),
        TaskQueue: "order-task-queue",
    }

    we, err := c.ExecuteWorkflow(context.Background(), options, CreateOrderWorkflow, input)
    if err != nil {
        return nil, err
    }

    log.Printf("Started workflow: %s, RunID: %s\n", we.GetID(), we.GetRunID())

    var result CreateOrderResult
    err = we.Get(context.Background(), &result)
    return &result, err
}

Monitoreo con Temporal UI

Temporal incluye una UI web que proporciona visibilidad completa sobre los workflows. Puedes ver el historial de eventos de cada workflow, incluyendo que activities se ejecutaron, cuanto tardaron, y si hubo reintentos o errores.

Funcionalidades principales:

Resumen

Glosario

Temporal.io

Definicion: Plataforma open-source de orquestacion de workflows que proporciona durabilidad, reintentos y persistencia de estado automaticos para flujos de trabajo distribuidos.

Por que es importante: Elimina la necesidad de construir infraestructura propia para sagas, reduciendo codigo y complejidad mientras garantiza confiabilidad.

Ejemplo practico: En lugar de escribir 200 lineas de codigo para persistencia y recovery, Temporal lo maneja automaticamente mientras tu solo escribes la logica de negocio.


Activity (Temporal)

Definicion: Unidad de trabajo en Temporal que puede tener efectos secundarios como llamadas a APIs, operaciones de base de datos o envio de notificaciones.

Por que es importante: Las activities encapsulan operaciones que pueden fallar. Temporal las reintenta automaticamente y rastrea su estado, haciendo el sistema mas resiliente.

Ejemplo practico: ProcessPayment es una activity que llama a Stripe. Si falla por timeout, Temporal la reintenta hasta 3 veces con backoff exponencial.


Workflow (Temporal)

Definicion: Funcion deterministica en Temporal que orquesta la ejecucion de activities y define la logica de flujo de un proceso de negocio.

Por que es importante: Los workflows son el corazon de Temporal. Su estado se persiste automaticamente, permitiendo recuperacion ante cualquier fallo.

Ejemplo practico: CreateOrderWorkflow coordina crear orden, reservar stock, procesar pago y confirmar envio, manejando compensaciones si algo falla.


Task Queue

Definicion: Cola nombrada en Temporal donde el servidor coloca tareas (workflows y activities) y los workers las recogen para ejecutar.

Por que es importante: Permite escalar horizontalmente agregando mas workers a la misma cola, distribuyendo la carga automaticamente.

Ejemplo practico: order-task-queue tiene 5 workers escuchando. Cuando llegan muchos pedidos, todos los workers procesan en paralelo.


Retry Policy

Definicion: Configuracion que define como Temporal reintenta activities fallidas, incluyendo intervalos, backoff y numero maximo de intentos.

Por que es importante: Permite recuperarse de fallos transitorios (timeouts de red, servicios ocupados) sin intervencion manual.

Ejemplo practico: Una policy con InitialInterval: 1s, BackoffCoefficient: 2.0 y MaximumAttempts: 5 reintenta a 1s, 2s, 4s, 8s antes de fallar definitivamente.


Determinismo (en Workflows)

Definicion: Propiedad que requiere que un workflow produzca siempre la misma secuencia de comandos dado el mismo input, sin depender de tiempo real, numeros aleatorios o I/O directo.

Por que es importante: Temporal “reproduce” workflows para reconstruir su estado. Si el codigo no es deterministico, la reproduccion puede dar resultados diferentes y corromper el estado.

Ejemplo practico: En lugar de time.Now() (no deterministico), usa workflow.Now(ctx) que retorna el tiempo del historial durante replay.


← Capítulo 13: Estado y Persistencia | Capítulo 15: Workflows y Activities →