← Volver al listado de tecnologías

Capítulo 15: Workflows y Activities en Go

Por: SiempreListo
sagagotemporalworkflowsactivities

Capítulo 15: Workflows y Activities en Go

“Activities hacen el trabajo, Workflows coordinan”

Introduccion

El capitulo anterior introdujo los conceptos basicos de Temporal. Ahora profundizaremos en patrones avanzados que hacen las sagas mas robustas y flexibles: heartbeats para operaciones largas, signals para modificacion en tiempo real, queries para consultar estado, y child workflows para dividir logica compleja.

Estos patrones son fundamentales para sistemas de produccion donde las sagas pueden durar minutos u horas, y donde necesitamos visibilidad y control granular sobre su ejecucion.

Workflows vs Activities

AspectoWorkflowActivity
DeterminismoDebe ser deterministicoPuede tener efectos secundarios
I/ONo permitido directoPermitido (DB, HTTP, etc.)
EstadoAutomaticamente persistidoSin persistencia automatica
ReintentosReplay desde historialReintentos configurables

La distincion clave es que los workflows solo toman decisiones (orquestan), mientras que las activities ejecutan el trabajo real. Esta separacion permite a Temporal persistir y reproducir el estado del workflow sin re-ejecutar operaciones con efectos secundarios.

Activities con Heartbeat

Un heartbeat (latido) es una senal periodica que una activity envia al servidor Temporal para indicar que sigue viva y progresando. Es crucial para operaciones largas porque:

  1. Deteccion de fallos: Si Temporal no recibe heartbeats, asume que el worker fallo y reasigna la tarea
  2. Reporte de progreso: El heartbeat puede incluir datos sobre el avance
  3. Continuacion desde punto de fallo: Si la activity se reinicia, puede consultar el ultimo heartbeat y continuar desde ahi

Para operaciones largas, reporta progreso:

// activities_advanced.go
package main

import (
    "context"
    "time"
    "go.temporal.io/sdk/activity"
)

type ShippingActivities struct{}

type ShipmentDetails struct {
    ShipmentID string
    TrackingNo string
    Carrier    string
    Status     string
}

func (a *ShippingActivities) ProcessLargeShipment(ctx context.Context, orderID string, items []OrderItem) (*ShipmentDetails, error) {
    logger := activity.GetLogger(ctx)

    for i, item := range items {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }

        // Procesar item
        logger.Info("Processing item", "index", i, "productID", item.ProductID)
        time.Sleep(time.Second) // Simular trabajo

        // Reportar progreso
        activity.RecordHeartbeat(ctx, i+1)
    }

    return &ShipmentDetails{
        ShipmentID: generateID(),
        TrackingNo: "TRK" + generateID()[:8],
        Carrier:    "FastShip",
        Status:     "scheduled",
    }, nil
}

Workflow con Signals

Un Signal es un mensaje asincrono que se envia a un workflow en ejecucion para modificar su comportamiento o inyectar datos. A diferencia de las activities (que el workflow inicia), los signals vienen del exterior.

Casos de uso comunes:

Los signals se reciben a traves de un channel (canal), una primitiva de concurrencia de Go que permite comunicacion segura entre goroutines.

// workflow_signals.go
package main

import (
    "go.temporal.io/sdk/workflow"
)

type OrderStatus struct {
    OrderID   string
    Status    string
    UpdatedAt int64
}

func OrderWithUpdatesWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
    logger := workflow.GetLogger(ctx)

    var status OrderStatus
    statusChannel := workflow.GetSignalChannel(ctx, "order-status-update")

    // Goroutine para manejar signals
    workflow.Go(ctx, func(ctx workflow.Context) {
        for {
            var update OrderStatus
            statusChannel.Receive(ctx, &update)
            status = update
            logger.Info("Status updated", "status", status.Status)
        }
    })

    // Ejecutar saga normalmente
    result, err := executeSagaSteps(ctx, input)
    if err != nil {
        return nil, err
    }

    return result, nil
}

// Enviar signal desde cliente
func sendStatusUpdate(workflowID string, status OrderStatus) error {
    c, _ := client.Dial(client.Options{})
    defer c.Close()

    return c.SignalWorkflow(context.Background(), workflowID, "", "order-status-update", status)
}

Workflow con Queries

Una Query permite consultar el estado interno de un workflow sin modificarlo. Es una operacion sincrona y de solo lectura, ideal para:

A diferencia de signals (que modifican estado), las queries solo leen. Son seguras de ejecutar multiples veces sin efectos secundarios.

El Query Handler es una funcion registrada en el workflow que responde a consultas externas. Debe ser rapido y no bloquear.

// workflow_queries.go
package main

import "go.temporal.io/sdk/workflow"

type SagaProgress struct {
    CurrentStep    string
    CompletedSteps []string
    FailedStep     string
    IsCompensating bool
}

func OrderWithQueryWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
    progress := &SagaProgress{
        CompletedSteps: make([]string, 0),
    }

    // Registrar query handler
    err := workflow.SetQueryHandler(ctx, "get-progress", func() (*SagaProgress, error) {
        return progress, nil
    })
    if err != nil {
        return nil, err
    }

    // Paso 1
    progress.CurrentStep = "create-order"
    // ... ejecutar
    progress.CompletedSteps = append(progress.CompletedSteps, "create-order")

    // Paso 2
    progress.CurrentStep = "reserve-stock"
    // ... ejecutar
    progress.CompletedSteps = append(progress.CompletedSteps, "reserve-stock")

    // Continuar con otros pasos...
    return &CreateOrderResult{Status: "completed"}, nil
}

// Consultar desde cliente
func queryProgress(workflowID string) (*SagaProgress, error) {
    c, _ := client.Dial(client.Options{})
    defer c.Close()

    resp, err := c.QueryWorkflow(context.Background(), workflowID, "", "get-progress")
    if err != nil {
        return nil, err
    }

    var progress SagaProgress
    err = resp.Get(&progress)
    return &progress, err
}

Child Workflows

Un Child Workflow es un workflow iniciado por otro workflow (el “padre”). Permite dividir logica compleja en piezas mas pequenas y manejables.

Beneficios de usar child workflows:

El child workflow hereda el contexto del padre pero tiene su propia identidad y ciclo de vida.

// child_workflows.go
package main

import (
    "time"
    "go.temporal.io/sdk/workflow"
)

func MainOrderWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
    childOptions := workflow.ChildWorkflowOptions{
        WorkflowID: "payment-" + workflow.GetInfo(ctx).WorkflowExecution.ID,
    }
    childCtx := workflow.WithChildOptions(ctx, childOptions)

    // Ejecutar workflow de pago como child
    var paymentResult PaymentResult
    err := workflow.ExecuteChildWorkflow(childCtx, PaymentWorkflow, PaymentInput{
        OrderID: input.CustomerID,
        Amount:  input.Total,
    }).Get(ctx, &paymentResult)

    if err != nil {
        return nil, err
    }

    return &CreateOrderResult{
        PaymentID: paymentResult.PaymentID,
        Status:    "completed",
    }, nil
}

type PaymentInput struct {
    OrderID string
    Amount  float64
}

type PaymentResult struct {
    PaymentID string
    Status    string
}

func PaymentWorkflow(ctx workflow.Context, input PaymentInput) (*PaymentResult, error) {
    options := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
    }
    ctx = workflow.WithActivityOptions(ctx, options)

    var paymentAct *PaymentActivities
    var paymentID string

    err := workflow.ExecuteActivity(ctx, paymentAct.ProcessPayment, input.OrderID, input.Amount).Get(ctx, &paymentID)
    if err != nil {
        return nil, err
    }

    return &PaymentResult{
        PaymentID: paymentID,
        Status:    "processed",
    }, nil
}

Retry Policies Avanzadas

Las Retry Policies pueden configurarse con granularidad fina. Un aspecto importante es distinguir entre errores retriables y no retriables:

NonRetryableErrorTypes define tipos de error que no deben reintentarse. Esto evita gastar recursos reintentando operaciones que sabemos que fallaran.

// retry_policies.go
package main

import (
    "time"
    "go.temporal.io/sdk/temporal"
    "go.temporal.io/sdk/workflow"
)

func configuredActivityOptions() workflow.ActivityOptions {
    return workflow.ActivityOptions{
        StartToCloseTimeout: 5 * time.Minute,
        HeartbeatTimeout:    30 * time.Second,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:        time.Second,
            BackoffCoefficient:     2.0,
            MaximumInterval:        time.Minute,
            MaximumAttempts:        5,
            NonRetryableErrorTypes: []string{"InvalidInputError", "InsufficientFundsError"},
        },
    }
}

// Error no retriable
type InvalidInputError struct {
    Message string
}

func (e *InvalidInputError) Error() string {
    return e.Message
}

func (a *PaymentActivities) ValidatePayment(ctx context.Context, amount float64) error {
    if amount <= 0 {
        return temporal.NewNonRetryableApplicationError(
            "Invalid amount",
            "InvalidInputError",
            &InvalidInputError{Message: "Amount must be positive"},
        )
    }
    return nil
}

Resumen

Glosario

Heartbeat

Definicion: Senal periodica que una activity envia al servidor Temporal para indicar que sigue viva y progresando en una operacion de larga duracion.

Por que es importante: Permite detectar workers que fallaron silenciosamente y reasignar la tarea. Tambien permite reportar progreso y continuar desde el ultimo punto conocido tras un reinicio.

Ejemplo practico: Una activity que procesa 1000 items envia un heartbeat cada 10 items. Si el worker falla en el item 500, el siguiente worker puede continuar desde ahi en lugar de empezar de cero.


Signal (Temporal)

Definicion: Mensaje asincrono enviado a un workflow en ejecucion desde el exterior para modificar su estado o comportamiento.

Por que es importante: Permite interaccion en tiempo real con workflows de larga duracion, como aprobar transacciones, actualizar datos o cancelar procesos.

Ejemplo practico: Un workflow de aprobacion de prestamo espera un signal “aprobar” o “rechazar” de un agente humano antes de continuar.


Query (Temporal)

Definicion: Operacion sincrona de solo lectura que consulta el estado interno de un workflow en ejecucion sin modificarlo.

Por que es importante: Proporciona visibilidad en tiempo real del progreso de un workflow para UIs, monitoreo y debugging sin afectar su ejecucion.

Ejemplo practico: Un endpoint REST consulta el progreso de una saga con QueryWorkflow("get-progress") y retorna “3 de 5 pasos completados”.


Child Workflow

Definicion: Workflow iniciado y coordinado por otro workflow (el padre), que tiene su propia identidad, historial y ciclo de vida.

Por que es importante: Permite descomponer workflows complejos en unidades mas pequenas, facilitando el mantenimiento, reutilizacion y testing.

Ejemplo practico: Un OrderWorkflow inicia un PaymentWorkflow como child. Si el pago falla, el child reporta el error al padre, que decide si compensar o reintentar.


Errores No Retriables

Definicion: Tipos de error que indican fallos permanentes y que no deben reintentarse porque el resultado sera siempre el mismo.

Por que es importante: Evita gastar tiempo y recursos reintentando operaciones condenadas al fracaso, acelerando la deteccion de problemas reales.

Ejemplo practico: Si ProcessPayment retorna “tarjeta invalida”, no tiene sentido reintentar. Se marca como NonRetryableApplicationError y la saga pasa directamente a compensacion.


StartToCloseTimeout

Definicion: Tiempo maximo permitido para que una activity complete su ejecucion desde que un worker la toma hasta que reporta exito o fallo.

Por que es importante: Protege contra activities que se quedan colgadas indefinidamente, garantizando que eventualmente Temporal detecte el problema y reintente.

Ejemplo practico: Con StartToCloseTimeout: 1 minute, si una llamada HTTP a un servicio externo no responde en 60 segundos, Temporal la marca como fallida y reintenta.


← Capítulo 14: Go con Temporal | Capítulo 16: Sagas en Python →