Capítulo 15: Workflows y Activities en Go
Capítulo 15: Workflows y Activities en Go
“Activities hacen el trabajo, Workflows coordinan”
Introduccion
El capitulo anterior introdujo los conceptos basicos de Temporal. Ahora profundizaremos en patrones avanzados que hacen las sagas mas robustas y flexibles: heartbeats para operaciones largas, signals para modificacion en tiempo real, queries para consultar estado, y child workflows para dividir logica compleja.
Estos patrones son fundamentales para sistemas de produccion donde las sagas pueden durar minutos u horas, y donde necesitamos visibilidad y control granular sobre su ejecucion.
Workflows vs Activities
| Aspecto | Workflow | Activity |
|---|---|---|
| Determinismo | Debe ser deterministico | Puede tener efectos secundarios |
| I/O | No permitido directo | Permitido (DB, HTTP, etc.) |
| Estado | Automaticamente persistido | Sin persistencia automatica |
| Reintentos | Replay desde historial | Reintentos configurables |
La distincion clave es que los workflows solo toman decisiones (orquestan), mientras que las activities ejecutan el trabajo real. Esta separacion permite a Temporal persistir y reproducir el estado del workflow sin re-ejecutar operaciones con efectos secundarios.
Activities con Heartbeat
Un heartbeat (latido) es una senal periodica que una activity envia al servidor Temporal para indicar que sigue viva y progresando. Es crucial para operaciones largas porque:
- Deteccion de fallos: Si Temporal no recibe heartbeats, asume que el worker fallo y reasigna la tarea
- Reporte de progreso: El heartbeat puede incluir datos sobre el avance
- Continuacion desde punto de fallo: Si la activity se reinicia, puede consultar el ultimo heartbeat y continuar desde ahi
Para operaciones largas, reporta progreso:
// activities_advanced.go
package main
import (
"context"
"time"
"go.temporal.io/sdk/activity"
)
type ShippingActivities struct{}
type ShipmentDetails struct {
ShipmentID string
TrackingNo string
Carrier string
Status string
}
func (a *ShippingActivities) ProcessLargeShipment(ctx context.Context, orderID string, items []OrderItem) (*ShipmentDetails, error) {
logger := activity.GetLogger(ctx)
for i, item := range items {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Procesar item
logger.Info("Processing item", "index", i, "productID", item.ProductID)
time.Sleep(time.Second) // Simular trabajo
// Reportar progreso
activity.RecordHeartbeat(ctx, i+1)
}
return &ShipmentDetails{
ShipmentID: generateID(),
TrackingNo: "TRK" + generateID()[:8],
Carrier: "FastShip",
Status: "scheduled",
}, nil
}
Workflow con Signals
Un Signal es un mensaje asincrono que se envia a un workflow en ejecucion para modificar su comportamiento o inyectar datos. A diferencia de las activities (que el workflow inicia), los signals vienen del exterior.
Casos de uso comunes:
- Actualizar informacion del pedido (cambio de direccion)
- Aprobar o rechazar manualmente una transaccion
- Cancelar un proceso en curso
Los signals se reciben a traves de un channel (canal), una primitiva de concurrencia de Go que permite comunicacion segura entre goroutines.
// workflow_signals.go
package main
import (
"go.temporal.io/sdk/workflow"
)
type OrderStatus struct {
OrderID string
Status string
UpdatedAt int64
}
func OrderWithUpdatesWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
logger := workflow.GetLogger(ctx)
var status OrderStatus
statusChannel := workflow.GetSignalChannel(ctx, "order-status-update")
// Goroutine para manejar signals
workflow.Go(ctx, func(ctx workflow.Context) {
for {
var update OrderStatus
statusChannel.Receive(ctx, &update)
status = update
logger.Info("Status updated", "status", status.Status)
}
})
// Ejecutar saga normalmente
result, err := executeSagaSteps(ctx, input)
if err != nil {
return nil, err
}
return result, nil
}
// Enviar signal desde cliente
func sendStatusUpdate(workflowID string, status OrderStatus) error {
c, _ := client.Dial(client.Options{})
defer c.Close()
return c.SignalWorkflow(context.Background(), workflowID, "", "order-status-update", status)
}
Workflow con Queries
Una Query permite consultar el estado interno de un workflow sin modificarlo. Es una operacion sincrona y de solo lectura, ideal para:
- Mostrar progreso en una UI
- Debugging y diagnostico
- Monitoreo y alertas
A diferencia de signals (que modifican estado), las queries solo leen. Son seguras de ejecutar multiples veces sin efectos secundarios.
El Query Handler es una funcion registrada en el workflow que responde a consultas externas. Debe ser rapido y no bloquear.
// workflow_queries.go
package main
import "go.temporal.io/sdk/workflow"
type SagaProgress struct {
CurrentStep string
CompletedSteps []string
FailedStep string
IsCompensating bool
}
func OrderWithQueryWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
progress := &SagaProgress{
CompletedSteps: make([]string, 0),
}
// Registrar query handler
err := workflow.SetQueryHandler(ctx, "get-progress", func() (*SagaProgress, error) {
return progress, nil
})
if err != nil {
return nil, err
}
// Paso 1
progress.CurrentStep = "create-order"
// ... ejecutar
progress.CompletedSteps = append(progress.CompletedSteps, "create-order")
// Paso 2
progress.CurrentStep = "reserve-stock"
// ... ejecutar
progress.CompletedSteps = append(progress.CompletedSteps, "reserve-stock")
// Continuar con otros pasos...
return &CreateOrderResult{Status: "completed"}, nil
}
// Consultar desde cliente
func queryProgress(workflowID string) (*SagaProgress, error) {
c, _ := client.Dial(client.Options{})
defer c.Close()
resp, err := c.QueryWorkflow(context.Background(), workflowID, "", "get-progress")
if err != nil {
return nil, err
}
var progress SagaProgress
err = resp.Get(&progress)
return &progress, err
}
Child Workflows
Un Child Workflow es un workflow iniciado por otro workflow (el “padre”). Permite dividir logica compleja en piezas mas pequenas y manejables.
Beneficios de usar child workflows:
- Modularidad: Cada workflow tiene una responsabilidad clara
- Reutilizacion: El mismo child puede ser usado por multiples padres
- Aislamiento de fallos: Un child puede fallar sin afectar directamente al padre
- Historial mas limpio: Cada workflow tiene su propio historial de eventos
El child workflow hereda el contexto del padre pero tiene su propia identidad y ciclo de vida.
// child_workflows.go
package main
import (
"time"
"go.temporal.io/sdk/workflow"
)
func MainOrderWorkflow(ctx workflow.Context, input CreateOrderInput) (*CreateOrderResult, error) {
childOptions := workflow.ChildWorkflowOptions{
WorkflowID: "payment-" + workflow.GetInfo(ctx).WorkflowExecution.ID,
}
childCtx := workflow.WithChildOptions(ctx, childOptions)
// Ejecutar workflow de pago como child
var paymentResult PaymentResult
err := workflow.ExecuteChildWorkflow(childCtx, PaymentWorkflow, PaymentInput{
OrderID: input.CustomerID,
Amount: input.Total,
}).Get(ctx, &paymentResult)
if err != nil {
return nil, err
}
return &CreateOrderResult{
PaymentID: paymentResult.PaymentID,
Status: "completed",
}, nil
}
type PaymentInput struct {
OrderID string
Amount float64
}
type PaymentResult struct {
PaymentID string
Status string
}
func PaymentWorkflow(ctx workflow.Context, input PaymentInput) (*PaymentResult, error) {
options := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, options)
var paymentAct *PaymentActivities
var paymentID string
err := workflow.ExecuteActivity(ctx, paymentAct.ProcessPayment, input.OrderID, input.Amount).Get(ctx, &paymentID)
if err != nil {
return nil, err
}
return &PaymentResult{
PaymentID: paymentID,
Status: "processed",
}, nil
}
Retry Policies Avanzadas
Las Retry Policies pueden configurarse con granularidad fina. Un aspecto importante es distinguir entre errores retriables y no retriables:
- Errores retriables: Timeouts de red, servicios temporalmente no disponibles, errores 5xx
- Errores no retriables: Validacion fallida, fondos insuficientes, datos invalidos
NonRetryableErrorTypes define tipos de error que no deben reintentarse. Esto evita gastar recursos reintentando operaciones que sabemos que fallaran.
// retry_policies.go
package main
import (
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
func configuredActivityOptions() workflow.ActivityOptions {
return workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
HeartbeatTimeout: 30 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
NonRetryableErrorTypes: []string{"InvalidInputError", "InsufficientFundsError"},
},
}
}
// Error no retriable
type InvalidInputError struct {
Message string
}
func (e *InvalidInputError) Error() string {
return e.Message
}
func (a *PaymentActivities) ValidatePayment(ctx context.Context, amount float64) error {
if amount <= 0 {
return temporal.NewNonRetryableApplicationError(
"Invalid amount",
"InvalidInputError",
&InvalidInputError{Message: "Amount must be positive"},
)
}
return nil
}
Resumen
- Heartbeats para operaciones largas
- Signals modifican workflows en ejecucion
- Queries consultan estado sin modificar
- Child workflows dividen logica compleja
- Retry policies controlan comportamiento de reintentos
Glosario
Heartbeat
Definicion: Senal periodica que una activity envia al servidor Temporal para indicar que sigue viva y progresando en una operacion de larga duracion.
Por que es importante: Permite detectar workers que fallaron silenciosamente y reasignar la tarea. Tambien permite reportar progreso y continuar desde el ultimo punto conocido tras un reinicio.
Ejemplo practico: Una activity que procesa 1000 items envia un heartbeat cada 10 items. Si el worker falla en el item 500, el siguiente worker puede continuar desde ahi en lugar de empezar de cero.
Signal (Temporal)
Definicion: Mensaje asincrono enviado a un workflow en ejecucion desde el exterior para modificar su estado o comportamiento.
Por que es importante: Permite interaccion en tiempo real con workflows de larga duracion, como aprobar transacciones, actualizar datos o cancelar procesos.
Ejemplo practico: Un workflow de aprobacion de prestamo espera un signal “aprobar” o “rechazar” de un agente humano antes de continuar.
Query (Temporal)
Definicion: Operacion sincrona de solo lectura que consulta el estado interno de un workflow en ejecucion sin modificarlo.
Por que es importante: Proporciona visibilidad en tiempo real del progreso de un workflow para UIs, monitoreo y debugging sin afectar su ejecucion.
Ejemplo practico: Un endpoint REST consulta el progreso de una saga con QueryWorkflow("get-progress") y retorna “3 de 5 pasos completados”.
Child Workflow
Definicion: Workflow iniciado y coordinado por otro workflow (el padre), que tiene su propia identidad, historial y ciclo de vida.
Por que es importante: Permite descomponer workflows complejos en unidades mas pequenas, facilitando el mantenimiento, reutilizacion y testing.
Ejemplo practico: Un OrderWorkflow inicia un PaymentWorkflow como child. Si el pago falla, el child reporta el error al padre, que decide si compensar o reintentar.
Errores No Retriables
Definicion: Tipos de error que indican fallos permanentes y que no deben reintentarse porque el resultado sera siempre el mismo.
Por que es importante: Evita gastar tiempo y recursos reintentando operaciones condenadas al fracaso, acelerando la deteccion de problemas reales.
Ejemplo practico: Si ProcessPayment retorna “tarjeta invalida”, no tiene sentido reintentar. Se marca como NonRetryableApplicationError y la saga pasa directamente a compensacion.
StartToCloseTimeout
Definicion: Tiempo maximo permitido para que una activity complete su ejecucion desde que un worker la toma hasta que reporta exito o fallo.
Por que es importante: Protege contra activities que se quedan colgadas indefinidamente, garantizando que eventualmente Temporal detecte el problema y reintente.
Ejemplo practico: Con StartToCloseTimeout: 1 minute, si una llamada HTTP a un servicio externo no responde en 60 segundos, Temporal la marca como fallida y reintenta.
← Capítulo 14: Go con Temporal | Capítulo 16: Sagas en Python →