← Volver al listado de tecnologías

Select y Patrones de Concurrencia

Por: Artiko
goselectconcurrenciapatrones

Select y Patrones de Concurrencia

Select statement

select permite esperar en multiples operaciones de channels simultaneamente. Es como un switch pero para channels:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch1 := make(chan string)
	ch2 := make(chan string)

	go func() {
		time.Sleep(100 * time.Millisecond)
		ch1 <- "uno"
	}()
	go func() {
		time.Sleep(200 * time.Millisecond)
		ch2 <- "dos"
	}()

	// select espera al primer channel que este listo
	select {
	case msg := <-ch1:
		fmt.Println("recibido de ch1:", msg)
	case msg := <-ch2:
		fmt.Println("recibido de ch2:", msg)
	}
}
// recibido de ch1: uno

Si multiples cases estan listos simultaneamente, select elige uno al azar.

Default case

El default hace que select no bloquee:

package main

import "fmt"

func main() {
	ch := make(chan int)

	select {
	case v := <-ch:
		fmt.Println("recibido:", v)
	default:
		fmt.Println("no hay datos disponibles")
	}
}
// no hay datos disponibles

Util para polling no bloqueante:

func intentarEnviar(ch chan<- int, valor int) bool {
	select {
	case ch <- valor:
		return true // enviado
	default:
		return false // channel lleno o sin receptor
	}
}

Timeouts con time.After

time.After retorna un channel que recibe un valor despues de la duracion especificada:

package main

import (
	"fmt"
	"time"
)

func operacionLenta() <-chan string {
	ch := make(chan string)
	go func() {
		time.Sleep(3 * time.Second)
		ch <- "resultado"
	}()
	return ch
}

func main() {
	select {
	case resultado := <-operacionLenta():
		fmt.Println("exito:", resultado)
	case <-time.After(1 * time.Second):
		fmt.Println("timeout: la operacion tardo demasiado")
	}
}
// timeout: la operacion tardo demasiado

Timeout con context (mas idiomatico)

package main

import (
	"context"
	"fmt"
	"time"
)

func buscar(ctx context.Context, query string) (string, error) {
	ch := make(chan string)
	go func() {
		time.Sleep(2 * time.Second) // simula busqueda lenta
		ch <- "resultado para: " + query
	}()

	select {
	case resultado := <-ch:
		return resultado, nil
	case <-ctx.Done():
		return "", ctx.Err()
	}
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	resultado, err := buscar(ctx, "golang")
	if err != nil {
		fmt.Println("error:", err)
		return
	}
	fmt.Println(resultado)
}
// error: context deadline exceeded

Patron Fan-In

Multiples productores envian a un solo consumidor. Combina multiples channels en uno:

package main

import (
	"fmt"
	"sync"
)

func productor(id int) <-chan string {
	ch := make(chan string)
	go func() {
		defer close(ch)
		for i := 0; i < 3; i++ {
			ch <- fmt.Sprintf("productor-%d: mensaje-%d", id, i)
		}
	}()
	return ch
}

func fanIn(channels ...<-chan string) <-chan string {
	out := make(chan string)
	var wg sync.WaitGroup

	for _, ch := range channels {
		wg.Add(1)
		go func(c <-chan string) {
			defer wg.Done()
			for v := range c {
				out <- v
			}
		}(ch)
	}

	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

func main() {
	ch1 := productor(1)
	ch2 := productor(2)
	ch3 := productor(3)

	for msg := range fanIn(ch1, ch2, ch3) {
		fmt.Println(msg)
	}
}

Patron Fan-Out

Un productor distribuye trabajo entre multiples consumidores:

package main

import (
	"fmt"
	"sync"
)

func fanOut(in <-chan int, workers int) []<-chan int {
	outs := make([]<-chan int, workers)
	for i := 0; i < workers; i++ {
		ch := make(chan int)
		outs[i] = ch
		go func(id int, out chan<- int) {
			defer close(out)
			for v := range in {
				resultado := v * v // procesar
				fmt.Printf("worker-%d: %d^2 = %d\n", id, v, resultado)
				out <- resultado
			}
		}(i, ch)
	}
	return outs
}

func main() {
	in := make(chan int)
	go func() {
		defer close(in)
		for i := 1; i <= 9; i++ {
			in <- i
		}
	}()

	resultados := fanOut(in, 3)

	var wg sync.WaitGroup
	for _, ch := range resultados {
		wg.Add(1)
		go func(c <-chan int) {
			defer wg.Done()
			for range c {
			}
		}(ch)
	}
	wg.Wait()
}

Patron Pipeline

Encadena etapas de procesamiento donde cada etapa es una goroutine:

package main

import "fmt"

// Etapa 1: genera numeros
func generar(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

// Etapa 2: filtra pares
func filtrarPares(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			if n%2 == 0 {
				out <- n
			}
		}
	}()
	return out
}

// Etapa 3: multiplica por 10
func multiplicar(in <-chan int, factor int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * factor
		}
	}()
	return out
}

func main() {
	// Pipeline: generar -> filtrar pares -> multiplicar x10
	numeros := generar(1, 2, 3, 4, 5, 6, 7, 8)
	pares := filtrarPares(numeros)
	resultado := multiplicar(pares, 10)

	for v := range resultado {
		fmt.Println(v)
	}
}
// 20
// 40
// 60
// 80

Patron Generator

Una funcion que retorna un channel de solo lectura. El productor corre en su propia goroutine:

package main

import (
	"fmt"
	"math/rand"
)

func generarIDs() <-chan string {
	ch := make(chan string)
	go func() {
		defer close(ch)
		for i := 0; i < 5; i++ {
			id := fmt.Sprintf("ID-%04d", rand.Intn(10000))
			ch <- id
		}
	}()
	return ch
}

func main() {
	for id := range generarIDs() {
		fmt.Println(id)
	}
}

Patron Worker Pool

Un pool fijo de workers procesa tareas de una cola compartida:

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, tareas <-chan int, resultados chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for tarea := range tareas {
		fmt.Printf("worker %d: procesando tarea %d\n", id, tarea)
		time.Sleep(100 * time.Millisecond) // simula trabajo
		resultados <- tarea * 2
	}
}

func main() {
	const numWorkers = 3
	const numTareas = 10

	tareas := make(chan int, numTareas)
	resultados := make(chan int, numTareas)

	// Lanzar workers
	var wg sync.WaitGroup
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, tareas, resultados, &wg)
	}

	// Enviar tareas
	for i := 1; i <= numTareas; i++ {
		tareas <- i
	}
	close(tareas) // no hay mas tareas

	// Esperar a que terminen y cerrar resultados
	go func() {
		wg.Wait()
		close(resultados)
	}()

	// Recoger resultados
	for r := range resultados {
		fmt.Println("resultado:", r)
	}
}
ComponenteRol
tareas channelCola de trabajo compartida
resultados channelDonde los workers depositan resultados
sync.WaitGroupEspera a que todos los workers terminen
close(tareas)Senaliza que no hay mas trabajo

Rate Limiting con time.Tick

Controla la frecuencia de operaciones:

package main

import (
	"fmt"
	"time"
)

func main() {
	peticiones := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		peticiones <- i
	}
	close(peticiones)

	// Limitar a 1 peticion cada 200ms
	limiter := time.Tick(200 * time.Millisecond)

	for req := range peticiones {
		<-limiter // espera el tick
		fmt.Println("peticion", req, "a las", time.Now().Format("15:04:05.000"))
	}
}
// peticion 1 a las 10:30:00.200
// peticion 2 a las 10:30:00.400
// peticion 3 a las 10:30:00.600
// peticion 4 a las 10:30:00.800
// peticion 5 a las 10:30:01.000

Para rafagas (burst), usa un channel con buffer como token bucket: llena el buffer con N tokens iniciales y recarga con time.Tick.

Resumen

PatronUsoChannels
Fan-InCombinar multiples fuentesN inputs -> 1 output
Fan-OutDistribuir trabajo1 input -> N outputs
PipelineEtapas secuencialesCadena de channels
GeneratorProducir valores lazyRetorna <-chan T
Worker PoolParalelizar con pool fijotareas + resultados
Rate LimitingControlar frecuenciatime.Tick como limitador