Select y Patrones de Concurrencia
Select y Patrones de Concurrencia
Select statement
select permite esperar en multiples operaciones de channels simultaneamente. Es como un switch pero para channels:
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "uno"
}()
go func() {
time.Sleep(200 * time.Millisecond)
ch2 <- "dos"
}()
// select espera al primer channel que este listo
select {
case msg := <-ch1:
fmt.Println("recibido de ch1:", msg)
case msg := <-ch2:
fmt.Println("recibido de ch2:", msg)
}
}
// recibido de ch1: uno
Si multiples cases estan listos simultaneamente, select elige uno al azar.
Default case
El default hace que select no bloquee:
package main
import "fmt"
func main() {
ch := make(chan int)
select {
case v := <-ch:
fmt.Println("recibido:", v)
default:
fmt.Println("no hay datos disponibles")
}
}
// no hay datos disponibles
Util para polling no bloqueante:
func intentarEnviar(ch chan<- int, valor int) bool {
select {
case ch <- valor:
return true // enviado
default:
return false // channel lleno o sin receptor
}
}
Timeouts con time.After
time.After retorna un channel que recibe un valor despues de la duracion especificada:
package main
import (
"fmt"
"time"
)
func operacionLenta() <-chan string {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch <- "resultado"
}()
return ch
}
func main() {
select {
case resultado := <-operacionLenta():
fmt.Println("exito:", resultado)
case <-time.After(1 * time.Second):
fmt.Println("timeout: la operacion tardo demasiado")
}
}
// timeout: la operacion tardo demasiado
Timeout con context (mas idiomatico)
package main
import (
"context"
"fmt"
"time"
)
func buscar(ctx context.Context, query string) (string, error) {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second) // simula busqueda lenta
ch <- "resultado para: " + query
}()
select {
case resultado := <-ch:
return resultado, nil
case <-ctx.Done():
return "", ctx.Err()
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
resultado, err := buscar(ctx, "golang")
if err != nil {
fmt.Println("error:", err)
return
}
fmt.Println(resultado)
}
// error: context deadline exceeded
Patron Fan-In
Multiples productores envian a un solo consumidor. Combina multiples channels en uno:
package main
import (
"fmt"
"sync"
)
func productor(id int) <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for i := 0; i < 3; i++ {
ch <- fmt.Sprintf("productor-%d: mensaje-%d", id, i)
}
}()
return ch
}
func fanIn(channels ...<-chan string) <-chan string {
out := make(chan string)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan string) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
ch1 := productor(1)
ch2 := productor(2)
ch3 := productor(3)
for msg := range fanIn(ch1, ch2, ch3) {
fmt.Println(msg)
}
}
Patron Fan-Out
Un productor distribuye trabajo entre multiples consumidores:
package main
import (
"fmt"
"sync"
)
func fanOut(in <-chan int, workers int) []<-chan int {
outs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
outs[i] = ch
go func(id int, out chan<- int) {
defer close(out)
for v := range in {
resultado := v * v // procesar
fmt.Printf("worker-%d: %d^2 = %d\n", id, v, resultado)
out <- resultado
}
}(i, ch)
}
return outs
}
func main() {
in := make(chan int)
go func() {
defer close(in)
for i := 1; i <= 9; i++ {
in <- i
}
}()
resultados := fanOut(in, 3)
var wg sync.WaitGroup
for _, ch := range resultados {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for range c {
}
}(ch)
}
wg.Wait()
}
Patron Pipeline
Encadena etapas de procesamiento donde cada etapa es una goroutine:
package main
import "fmt"
// Etapa 1: genera numeros
func generar(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Etapa 2: filtra pares
func filtrarPares(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
// Etapa 3: multiplica por 10
func multiplicar(in <-chan int, factor int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * factor
}
}()
return out
}
func main() {
// Pipeline: generar -> filtrar pares -> multiplicar x10
numeros := generar(1, 2, 3, 4, 5, 6, 7, 8)
pares := filtrarPares(numeros)
resultado := multiplicar(pares, 10)
for v := range resultado {
fmt.Println(v)
}
}
// 20
// 40
// 60
// 80
Patron Generator
Una funcion que retorna un channel de solo lectura. El productor corre en su propia goroutine:
package main
import (
"fmt"
"math/rand"
)
func generarIDs() <-chan string {
ch := make(chan string)
go func() {
defer close(ch)
for i := 0; i < 5; i++ {
id := fmt.Sprintf("ID-%04d", rand.Intn(10000))
ch <- id
}
}()
return ch
}
func main() {
for id := range generarIDs() {
fmt.Println(id)
}
}
Patron Worker Pool
Un pool fijo de workers procesa tareas de una cola compartida:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, tareas <-chan int, resultados chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for tarea := range tareas {
fmt.Printf("worker %d: procesando tarea %d\n", id, tarea)
time.Sleep(100 * time.Millisecond) // simula trabajo
resultados <- tarea * 2
}
}
func main() {
const numWorkers = 3
const numTareas = 10
tareas := make(chan int, numTareas)
resultados := make(chan int, numTareas)
// Lanzar workers
var wg sync.WaitGroup
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, tareas, resultados, &wg)
}
// Enviar tareas
for i := 1; i <= numTareas; i++ {
tareas <- i
}
close(tareas) // no hay mas tareas
// Esperar a que terminen y cerrar resultados
go func() {
wg.Wait()
close(resultados)
}()
// Recoger resultados
for r := range resultados {
fmt.Println("resultado:", r)
}
}
| Componente | Rol |
|---|---|
tareas channel | Cola de trabajo compartida |
resultados channel | Donde los workers depositan resultados |
sync.WaitGroup | Espera a que todos los workers terminen |
close(tareas) | Senaliza que no hay mas trabajo |
Rate Limiting con time.Tick
Controla la frecuencia de operaciones:
package main
import (
"fmt"
"time"
)
func main() {
peticiones := make(chan int, 5)
for i := 1; i <= 5; i++ {
peticiones <- i
}
close(peticiones)
// Limitar a 1 peticion cada 200ms
limiter := time.Tick(200 * time.Millisecond)
for req := range peticiones {
<-limiter // espera el tick
fmt.Println("peticion", req, "a las", time.Now().Format("15:04:05.000"))
}
}
// peticion 1 a las 10:30:00.200
// peticion 2 a las 10:30:00.400
// peticion 3 a las 10:30:00.600
// peticion 4 a las 10:30:00.800
// peticion 5 a las 10:30:01.000
Para rafagas (burst), usa un channel con buffer como token bucket: llena el buffer con N tokens iniciales y recarga con time.Tick.
Resumen
| Patron | Uso | Channels |
|---|---|---|
| Fan-In | Combinar multiples fuentes | N inputs -> 1 output |
| Fan-Out | Distribuir trabajo | 1 input -> N outputs |
| Pipeline | Etapas secuenciales | Cadena de channels |
| Generator | Producir valores lazy | Retorna <-chan T |
| Worker Pool | Paralelizar con pool fijo | tareas + resultados |
| Rate Limiting | Controlar frecuencia | time.Tick como limitador |
selectmultiplexa operaciones sobre channels; elige al azar si hay multiples listosdefaulten select evita bloqueo;time.Afterimplementa timeouts- Fan-In combina fuentes; Fan-Out distribuye trabajo; Pipeline encadena etapas
- Worker Pool limita concurrencia a N goroutines fijas
time.Tickpermite rate limiting simple y con rafagas