Capítulo 16: Programación Asíncrona
Capítulo 16: Programación Asíncrona
La programación asíncrona en Lua se construye sobre las coroutines, permitiendo crear aplicaciones concurrentes sin la complejidad de threads o callbacks anidados. En este capítulo exploraremos cómo implementar event loops, patrones async comunes, y sistemas de alta concurrencia.
1. Event Loop Básico con Coroutines
Un event loop gestiona tareas asíncronas ejecutándolas cooperativamente:
-- Event loop básico
local EventLoop = {}
EventLoop.__index = EventLoop
function EventLoop.new()
local self = setmetatable({}, EventLoop)
self.tasks = {}
self.timers = {}
self.running = false
return self
end
function EventLoop:spawn(fn, ...)
local args = {...}
local co = coroutine.create(function()
fn(table.unpack(args))
end)
table.insert(self.tasks, {
coroutine = co,
status = "ready"
})
return co
end
function EventLoop:sleep(seconds)
local co = coroutine.running()
local wake_time = os.clock() + seconds
table.insert(self.timers, {
coroutine = co,
wake_time = wake_time
})
coroutine.yield()
end
function EventLoop:run()
self.running = true
while self.running and (#self.tasks > 0 or #self.timers > 0) do
-- Procesar tareas listas
local i = 1
while i <= #self.tasks do
local task = self.tasks[i]
if task.status == "ready" then
local success, err = coroutine.resume(task.coroutine)
if not success then
print("Error en tarea:", err)
table.remove(self.tasks, i)
elseif coroutine.status(task.coroutine) == "dead" then
table.remove(self.tasks, i)
else
i = i + 1
end
else
i = i + 1
end
end
-- Procesar timers
local current_time = os.clock()
local i = 1
while i <= #self.timers do
local timer = self.timers[i]
if current_time >= timer.wake_time then
-- Reactivar la coroutine
local task_found = false
for j, task in ipairs(self.tasks) do
if task.coroutine == timer.coroutine then
task.status = "ready"
task_found = true
break
end
end
if not task_found then
table.insert(self.tasks, {
coroutine = timer.coroutine,
status = "ready"
})
end
table.remove(self.timers, i)
else
i = i + 1
end
end
-- Pequeña pausa para no saturar CPU
if #self.tasks == 0 and #self.timers > 0 then
-- Calcular cuánto esperar hasta el próximo timer
local min_wait = math.huge
for _, timer in ipairs(self.timers) do
local wait = timer.wake_time - os.clock()
if wait < min_wait then
min_wait = wait
end
end
if min_wait > 0 then
-- En un sistema real, usaríamos select() o similar
os.execute("sleep " .. math.max(0.001, min_wait))
end
end
end
end
function EventLoop:stop()
self.running = false
end
-- Ejemplo de uso
local loop = EventLoop.new()
loop:spawn(function()
for i = 1, 3 do
print("Tarea 1 - iteración", i)
loop:sleep(1)
end
end)
loop:spawn(function()
for i = 1, 5 do
print("Tarea 2 - iteración", i)
loop:sleep(0.5)
end
end)
print("Iniciando event loop...")
loop:run()
print("Event loop finalizado")
Event Loop con Canales
Implementemos canales para comunicación entre coroutines:
-- Sistema de canales
local Channel = {}
Channel.__index = Channel
function Channel.new(buffer_size)
local self = setmetatable({}, Channel)
self.buffer = {}
self.buffer_size = buffer_size or 0
self.waiting_senders = {}
self.waiting_receivers = {}
return self
end
function Channel:send(value, loop)
-- Si hay receptores esperando, entregar directamente
if #self.waiting_receivers > 0 then
local receiver = table.remove(self.waiting_receivers, 1)
receiver.value = value
receiver.status = "ready"
return
end
-- Si hay espacio en el buffer, almacenar
if #self.buffer < self.buffer_size then
table.insert(self.buffer, value)
return
end
-- De lo contrario, esperar
local co = coroutine.running()
table.insert(self.waiting_senders, {
coroutine = co,
value = value
})
coroutine.yield()
end
function Channel:receive(loop)
-- Si hay datos en el buffer, tomar uno
if #self.buffer > 0 then
local value = table.remove(self.buffer, 1)
-- Si hay senders esperando, mover uno al buffer
if #self.waiting_senders > 0 then
local sender = table.remove(self.waiting_senders, 1)
table.insert(self.buffer, sender.value)
-- Reactivar el sender
for _, task in ipairs(loop.tasks) do
if task.coroutine == sender.coroutine then
task.status = "ready"
break
end
end
end
return value
end
-- Si hay senders esperando, tomar directamente
if #self.waiting_senders > 0 then
local sender = table.remove(self.waiting_senders, 1)
-- Reactivar el sender
for _, task in ipairs(loop.tasks) do
if task.coroutine == sender.coroutine then
task.status = "ready"
break
end
end
return sender.value
end
-- De lo contrario, esperar
local co = coroutine.running()
local receiver = {
coroutine = co,
value = nil,
status = "waiting"
}
table.insert(self.waiting_receivers, receiver)
table.insert(loop.tasks, {
coroutine = co,
status = "waiting"
})
coroutine.yield()
return receiver.value
end
-- Ejemplo con canales
local loop = EventLoop.new()
local ch = Channel.new(2)
loop:spawn(function()
for i = 1, 5 do
print("Enviando:", i)
ch:send(i, loop)
loop:sleep(0.3)
end
ch:send(nil, loop) -- Señal de fin
end)
loop:spawn(function()
while true do
local value = ch:receive(loop)
if value == nil then break end
print("Recibido:", value)
loop:sleep(0.7)
end
end)
loop:run()
2. Patrón Producer-Consumer
Implementemos el patrón clásico producer-consumer:
-- Queue thread-safe para producer-consumer
local Queue = {}
Queue.__index = Queue
function Queue.new()
local self = setmetatable({}, Queue)
self.items = {}
self.waiting = {}
return self
end
function Queue:enqueue(item)
table.insert(self.items, item)
-- Despertar un consumidor si está esperando
if #self.waiting > 0 then
local waiter = table.remove(self.waiting, 1)
coroutine.resume(waiter.coroutine, table.remove(self.items, 1))
end
end
function Queue:dequeue()
if #self.items > 0 then
return table.remove(self.items, 1)
end
-- Esperar hasta que haya un item
local co = coroutine.running()
table.insert(self.waiting, {coroutine = co})
return coroutine.yield()
end
-- Producer-Consumer con múltiples workers
local ProducerConsumer = {}
ProducerConsumer.__index = ProducerConsumer
function ProducerConsumer.new(loop, num_workers)
local self = setmetatable({}, ProducerConsumer)
self.loop = loop
self.queue = Queue.new()
self.num_workers = num_workers or 3
self.workers = {}
self.results = {}
return self
end
function ProducerConsumer:start_workers(process_fn)
for i = 1, self.num_workers do
local worker = self.loop:spawn(function()
while true do
local task = self.queue:dequeue()
if task == nil then break end -- Señal de parada
local result = process_fn(task, i)
table.insert(self.results, result)
end
print(string.format("Worker %d finalizado", i))
end)
table.insert(self.workers, worker)
end
end
function ProducerConsumer:submit(task)
self.queue:enqueue(task)
end
function ProducerConsumer:stop()
-- Enviar señal de parada a cada worker
for i = 1, self.num_workers do
self.queue:enqueue(nil)
end
end
-- Ejemplo de uso
local loop = EventLoop.new()
local pc = ProducerConsumer.new(loop, 3)
-- Función de procesamiento
local function process_task(task, worker_id)
print(string.format("Worker %d procesando: %s", worker_id, task.name))
loop:sleep(task.duration)
return {
task = task.name,
worker = worker_id,
result = task.value * 2
}
end
pc:start_workers(process_task)
-- Producer
loop:spawn(function()
local tasks = {
{name = "Task1", value = 10, duration = 1},
{name = "Task2", value = 20, duration = 0.5},
{name = "Task3", value = 30, duration = 1.5},
{name = "Task4", value = 40, duration = 0.8},
{name = "Task5", value = 50, duration = 1.2},
}
for _, task in ipairs(tasks) do
pc:submit(task)
loop:sleep(0.2)
end
pc:stop()
end)
loop:run()
print("\nResultados:")
for _, result in ipairs(pc.results) do
print(string.format("%s -> %d (worker %d)",
result.task, result.result, result.worker))
end
3. Pipelines de Datos Asincrónicos
Creemos pipelines para procesar datos en etapas:
-- Pipeline asíncrono
local Pipeline = {}
Pipeline.__index = Pipeline
function Pipeline.new(loop)
local self = setmetatable({}, Pipeline)
self.loop = loop
self.stages = {}
return self
end
function Pipeline:add_stage(name, fn, workers)
local stage = {
name = name,
fn = fn,
workers = workers or 1,
input = Channel.new(10),
output = Channel.new(10)
}
table.insert(self.stages, stage)
return self
end
function Pipeline:start()
for i, stage in ipairs(self.stages) do
local input_ch = stage.input
local output_ch = stage.output
-- Si no es la última etapa, conectar con la siguiente
if i < #self.stages then
output_ch = self.stages[i + 1].input
end
-- Crear workers para esta etapa
for w = 1, stage.workers do
self.loop:spawn(function()
while true do
local data = input_ch:receive(self.loop)
if data == nil then
-- Propagar señal de fin
if output_ch ~= stage.output then
output_ch:send(nil, self.loop)
end
break
end
local result = stage.fn(data, w)
if result ~= nil then
if output_ch == stage.output then
table.insert(stage.output.buffer, result)
else
output_ch:send(result, self.loop)
end
end
end
print(string.format("Stage '%s' worker %d finalizado", stage.name, w))
end)
end
end
end
function Pipeline:send(data)
if #self.stages > 0 then
self.stages[1].input:send(data, self.loop)
end
end
function Pipeline:close()
if #self.stages > 0 then
self.stages[1].input:send(nil, self.loop)
end
end
function Pipeline:get_results()
if #self.stages > 0 then
return self.stages[#self.stages].output.buffer
end
return {}
end
-- Ejemplo: Pipeline de procesamiento de datos
local loop = EventLoop.new()
local pipe = Pipeline.new(loop)
pipe:add_stage("parse", function(data, worker)
print(string.format("[parse:%d] %s", worker, data))
loop:sleep(0.1)
return tonumber(data)
end, 2)
pipe:add_stage("transform", function(data, worker)
print(string.format("[transform:%d] %d", worker, data))
loop:sleep(0.15)
return data * data
end, 2)
pipe:add_stage("format", function(data, worker)
print(string.format("[format:%d] %d", worker, data))
loop:sleep(0.1)
return string.format("resultado: %d", data)
end, 1)
pipe:start()
-- Enviar datos al pipeline
loop:spawn(function()
for i = 1, 5 do
pipe:send(tostring(i))
loop:sleep(0.05)
end
pipe:close()
end)
loop:run()
print("\nResultados finales:")
for _, result in ipairs(pipe:get_results()) do
print(result)
end
4. Timeout y Cancelación
Implementemos mecanismos de timeout y cancelación:
-- Sistema de timeout
local Timeout = {}
Timeout.__index = Timeout
function Timeout.new(loop, seconds)
local self = setmetatable({}, Timeout)
self.loop = loop
self.seconds = seconds
self.cancelled = false
return self
end
function Timeout:run(fn, ...)
local args = {...}
local result = nil
local completed = false
local timed_out = false
-- Coroutine para la tarea
local task_co = self.loop:spawn(function()
result = fn(table.unpack(args))
completed = true
end)
-- Coroutine para el timeout
self.loop:spawn(function()
self.loop:sleep(self.seconds)
if not completed and not self.cancelled then
timed_out = true
print("Timeout alcanzado")
end
end)
return function()
return completed, timed_out, result
end
end
function Timeout:cancel()
self.cancelled = true
end
-- Context con cancelación
local Context = {}
Context.__index = Context
function Context.new(loop)
local self = setmetatable({}, Context)
self.loop = loop
self.cancelled = false
self.cancel_callbacks = {}
return self
end
function Context:is_cancelled()
return self.cancelled
end
function Context:cancel()
if self.cancelled then return end
self.cancelled = true
for _, callback in ipairs(self.cancel_callbacks) do
callback()
end
end
function Context:on_cancel(callback)
table.insert(self.cancel_callbacks, callback)
end
function Context:with_timeout(seconds)
local timeout_ctx = Context.new(self.loop)
-- Heredar cancelación del padre
self:on_cancel(function()
timeout_ctx:cancel()
end)
-- Configurar timeout
self.loop:spawn(function()
self.loop:sleep(seconds)
timeout_ctx:cancel()
end)
return timeout_ctx
end
-- Ejemplo de uso
local loop = EventLoop.new()
-- Tarea con timeout
local function long_running_task(ctx)
for i = 1, 10 do
if ctx:is_cancelled() then
print("Tarea cancelada en iteración", i)
return nil
end
print("Trabajando... iteración", i)
loop:sleep(0.5)
end
return "completado"
end
local ctx = Context.new(loop)
local timeout_ctx = ctx:with_timeout(3)
loop:spawn(function()
local result = long_running_task(timeout_ctx)
if result then
print("Resultado:", result)
else
print("Tarea no completada")
end
end)
loop:run()
5. Manejo de Errores en Código Async
Estrategias para manejar errores en código asíncrono:
-- Future/Promise para manejo de errores
local Future = {}
Future.__index = Future
function Future.new(loop)
local self = setmetatable({}, Future)
self.loop = loop
self.completed = false
self.success = false
self.value = nil
self.error = nil
self.callbacks = {}
return self
end
function Future:resolve(value)
if self.completed then return end
self.completed = true
self.success = true
self.value = value
for _, callback in ipairs(self.callbacks) do
if callback.on_success then
callback.on_success(value)
end
end
end
function Future:reject(error)
if self.completed then return end
self.completed = true
self.success = false
self.error = error
for _, callback in ipairs(self.callbacks) do
if callback.on_error then
callback.on_error(error)
end
end
end
function Future:then_do(on_success, on_error)
local next_future = Future.new(self.loop)
local callback = {
on_success = function(value)
if on_success then
local success, result = pcall(on_success, value)
if success then
next_future:resolve(result)
else
next_future:reject(result)
end
else
next_future:resolve(value)
end
end,
on_error = function(err)
if on_error then
local success, result = pcall(on_error, err)
if success then
next_future:resolve(result)
else
next_future:reject(result)
end
else
next_future:reject(err)
end
end
}
if self.completed then
if self.success then
callback.on_success(self.value)
else
callback.on_error(self.error)
end
else
table.insert(self.callbacks, callback)
end
return next_future
end
function Future:catch_error(on_error)
return self:then_do(nil, on_error)
end
-- Función async que retorna Future
local function fetch_data(loop, url, delay)
local future = Future.new(loop)
loop:spawn(function()
loop:sleep(delay)
-- Simular error random
if math.random() > 0.7 then
future:reject("Error de red: " .. url)
else
future:resolve({url = url, data = "Datos de " .. url})
end
end)
return future
end
-- Ejemplo de uso
local loop = EventLoop.new()
loop:spawn(function()
fetch_data(loop, "https://api.example.com/users", 1)
:then_do(function(response)
print("Éxito:", response.data)
return response.data
end)
:then_do(function(data)
print("Procesando:", data)
return string.upper(data)
end)
:then_do(function(result)
print("Resultado final:", result)
end)
:catch_error(function(err)
print("Error capturado:", err)
end)
end)
loop:run()
6. DEEP DIVE: Event Loop Internals
Analicemos cómo funciona un event loop real:
--[[
ESTRUCTURA INTERNA DE UN EVENT LOOP
1. Task Queue:
- Cola de tareas listas para ejecutar
- Cada tarea es una coroutine con estado
- Estados: ready, waiting, blocked, dead
2. Timer Wheel:
- Estructura de datos para timers eficientes
- Organiza timers en "buckets" por tiempo
- O(1) para inserción, O(1) amortizado para tick
3. I/O Selector:
- En sistemas reales: epoll (Linux), kqueue (BSD), IOCP (Windows)
- Notifica cuando un descriptor está listo
- Integrado con el event loop principal
4. Microtask Queue:
- Para callbacks de alta prioridad
- Se ejecutan antes de tareas normales
- Evita starvation con límite de ejecución
]]
-- Event Loop avanzado con microtasks
local AdvancedEventLoop = {}
AdvancedEventLoop.__index = AdvancedEventLoop
function AdvancedEventLoop.new()
local self = setmetatable({}, AdvancedEventLoop)
self.tasks = {}
self.microtasks = {}
self.timers = {}
self.io_watchers = {}
self.running = false
self.tick_count = 0
return self
end
function AdvancedEventLoop:queue_microtask(fn)
table.insert(self.microtasks, {
fn = fn,
created_at = self.tick_count
})
end
function AdvancedEventLoop:process_microtasks()
local count = 0
local max_microtasks = 100 -- Prevenir starvation
while #self.microtasks > 0 and count < max_microtasks do
local microtask = table.remove(self.microtasks, 1)
local success, err = pcall(microtask.fn)
if not success then
print("Error en microtask:", err)
end
count = count + 1
end
if #self.microtasks > 0 then
print(string.format("Advertencia: %d microtasks pendientes", #self.microtasks))
end
end
function AdvancedEventLoop:tick()
self.tick_count = self.tick_count + 1
-- 1. Procesar microtasks primero
self:process_microtasks()
-- 2. Procesar tareas normales
local processed = 0
local i = 1
while i <= #self.tasks and processed < 10 do
local task = self.tasks[i]
if task.status == "ready" then
local success, err = coroutine.resume(task.coroutine)
if not success then
print("Error en tarea:", err)
table.remove(self.tasks, i)
elseif coroutine.status(task.coroutine) == "dead" then
table.remove(self.tasks, i)
else
i = i + 1
end
processed = processed + 1
else
i = i + 1
end
end
-- 3. Procesar timers
local current_time = os.clock()
for i = #self.timers, 1, -1 do
local timer = self.timers[i]
if current_time >= timer.wake_time then
self:queue_microtask(function()
for _, task in ipairs(self.tasks) do
if task.coroutine == timer.coroutine then
task.status = "ready"
break
end
end
end)
table.remove(self.timers, i)
end
end
end
-- Métricas del event loop
function AdvancedEventLoop:get_metrics()
return {
tick_count = self.tick_count,
tasks = #self.tasks,
microtasks = #self.microtasks,
timers = #self.timers,
io_watchers = #self.io_watchers
}
end
print([[
OPTIMIZACIONES COMUNES:
1. Timer Wheel: O(1) scheduling
- Array circular de buckets
- Cada bucket contiene timers para ese intervalo
- Puntero avanza cada tick
2. I/O Multiplexing:
- select(): Portable pero limitado (1024 fds)
- epoll(): Linux, eficiente para muchos fds
- kqueue(): BSD/macOS, edge-triggered
- IOCP(): Windows, completion-based
3. Task Scheduling:
- Round-robin para fairness
- Priority queues para prioridades
- Work stealing para balance de carga
4. Memory Management:
- Pool de coroutines reutilizables
- Buffer pools para I/O
- Weak tables para caches
]])
7. SOAPBOX: Lua Coroutines vs Node.js
Mi opinión sobre diferentes modelos de concurrencia:
print([[
═══════════════════════════════════════════════════════════════
SOAPBOX: Lua Coroutines vs Node.js Event Loop
═══════════════════════════════════════════════════════════════
Lua Coroutines:
✓ Control explícito de concurrencia
✓ Sin callback hell
✓ Stack completo por coroutine
✓ Debugging más simple
✗ Require diseño cuidadoso
✗ Sin ecosistema async masivo
Node.js Event Loop:
✓ Ecosistema masivo (npm)
✓ Async/await nativo
✓ I/O no bloqueante por defecto
✗ Callback hell (pre-async/await)
✗ Stack traces fragmentados
✗ Menos control sobre scheduling
VEREDICTO:
Lua coroutines ofrecen un modelo MÁS SIMPLE y PREDECIBLE que
callbacks o promesas. Con async/await, la brecha se reduce,
pero Lua sigue ganando en:
1. Simplicidad conceptual
2. Control fino de ejecución
3. Debugging más claro
4. Menor overhead de memoria
Node.js gana en ecosistema y herramientas, pero si buscas
entender concurrencia profundamente, Lua es mejor profesor.
La belleza de Lua es que puedes construir CUALQUIER modelo
de concurrencia sobre coroutines: actors, CSP, futures,
streams... Es un lienzo en blanco.
═══════════════════════════════════════════════════════════════
]])
-- Comparación de código
print("\n-- CALLBACK HELL (Node.js style):")
print([[
fetchUser(userId, function(err, user) {
if (err) return handleError(err);
fetchPosts(user.id, function(err, posts) {
if (err) return handleError(err);
fetchComments(posts[0].id, function(err, comments) {
if (err) return handleError(err);
render(user, posts, comments);
});
});
});
]])
print("\n-- LUA COROUTINES (limpio y secuencial):")
print([[
local user = fetchUser(userId)
local posts = fetchPosts(user.id)
local comments = fetchComments(posts[1].id)
render(user, posts, comments)
]])
print("\n-- ASYNC/AWAIT (Node.js moderno):")
print([[
try {
const user = await fetchUser(userId);
const posts = await fetchPosts(user.id);
const comments = await fetchComments(posts[0].id);
render(user, posts, comments);
} catch (err) {
handleError(err);
}
]])
print("\n¡Lua lo tenía resuelto desde los 90s!")
8. Caso Práctico: Web Scraper Asíncrono
Implementemos un web scraper simple usando nuestro event loop:
-- Web scraper asíncrono (simulado)
local WebScraper = {}
WebScraper.__index = WebScraper
function WebScraper.new(loop, max_concurrent)
local self = setmetatable({}, WebScraper)
self.loop = loop
self.max_concurrent = max_concurrent or 3
self.pending = {}
self.results = {}
self.active = 0
return self
end
function WebScraper:fetch_url(url)
-- Simular HTTP request
local future = Future.new(self.loop)
self.loop:spawn(function()
local delay = math.random() * 2 + 0.5
self.loop:sleep(delay)
if math.random() > 0.1 then
future:resolve({
url = url,
status = 200,
body = string.format("Contenido de %s", url),
links = self:extract_links(url)
})
else
future:reject("Error 404: " .. url)
end
end)
return future
end
function WebScraper:extract_links(url)
-- Simular extracción de links
local num_links = math.random(2, 5)
local links = {}
for i = 1, num_links do
table.insert(links, string.format("%s/page%d", url, i))
end
return links
end
function WebScraper:crawl(start_url, max_depth)
local visited = {}
local queue = {{url = start_url, depth = 0}}
local function process_next()
if #queue == 0 then return end
local item = table.remove(queue, 1)
if visited[item.url] or item.depth > max_depth then
process_next()
return
end
visited[item.url] = true
self.active = self.active + 1
print(string.format("Fetching [depth=%d]: %s", item.depth, item.url))
self:fetch_url(item.url)
:then_do(function(response)
table.insert(self.results, {
url = item.url,
depth = item.depth,
status = response.status
})
if item.depth < max_depth then
for _, link in ipairs(response.links) do
if not visited[link] then
table.insert(queue, {url = link, depth = item.depth + 1})
end
end
end
self.active = self.active - 1
if self.active < self.max_concurrent then
process_next()
end
end)
:catch_error(function(err)
print("Error:", err)
self.active = self.active - 1
process_next()
end)
end
-- Iniciar crawlers concurrentes
for i = 1, self.max_concurrent do
self.loop:spawn(function()
process_next()
end)
end
end
-- Ejemplo de uso
local loop = AdvancedEventLoop.new()
local scraper = WebScraper.new(loop, 3)
scraper:crawl("https://example.com", 2)
-- Correr hasta que termine
loop:spawn(function()
while scraper.active > 0 or #scraper.pending > 0 do
loop:sleep(0.1)
end
loop:stop()
end)
loop:run()
print("\n=== RESULTADOS DEL SCRAPING ===")
for _, result in ipairs(scraper.results) do
print(string.format("[depth=%d] %s -> %d",
result.depth, result.url, result.status))
end
9. Ejercicios
Ejercicio 1: Rate Limiter Asíncrono
Implementa un rate limiter que controle cuántas operaciones se pueden ejecutar por segundo:
--[[
Crea un RateLimiter que:
1. Limite operaciones a N por segundo
2. Encole operaciones excedentes
3. Use token bucket o leaky bucket algorithm
4. Soporte múltiples clientes con diferentes límites
Ejemplo:
local limiter = RateLimiter.new(loop, 5) -- 5 ops/sec
for i = 1, 20 do
limiter:execute(function()
print("Operación", i)
end)
end
BONUS: Implementa burst capacity (ráfagas)
]]
Ejercicio 2: Async Pool con Retry Logic
Crea un pool de workers con reintentos automáticos:
--[[
Implementa AsyncPool con:
1. Pool de N workers
2. Retry automático con backoff exponencial
3. Circuit breaker para fallos continuos
4. Métricas: éxitos, fallos, reintentos
Ejemplo:
local pool = AsyncPool.new(loop, {
workers = 3,
max_retries = 3,
backoff_base = 2
})
pool:submit(function()
-- Tarea que puede fallar
if math.random() > 0.7 then
error("Fallo temporal")
end
return "éxito"
end)
print(pool:get_metrics())
]]
Ejercicio 3: Reactive Stream
Implementa un sistema de streams reactivos estilo RxJS:
--[[
Crea Observable con operadores:
1. map, filter, reduce
2. debounce, throttle
3. merge, concat, zip
4. error handling y retry
Ejemplo:
local clicks = Observable.from_events(button, "click")
clicks
:debounce(300)
:map(function(e) return e.x end)
:filter(function(x) return x > 100 end)
:subscribe(function(x)
print("Click válido en x:", x)
end)
BONUS: Implementa hot vs cold observables
]]
Recursos Adicionales
- LuaSocket: Librería I/O asíncrona para Lua
- Copas: Framework asíncrono sobre LuaSocket
- Luvit: Node.js-style async I/O para Lua
- lua-http: Cliente/servidor HTTP asíncrono
Conclusión
La programación asíncrona en Lua es elegante y poderosa gracias a las coroutines. Hemos visto cómo construir event loops, implementar patrones async comunes, y crear sistemas concurrentes sin la complejidad de threads o callbacks anidados.
En el próximo capítulo exploraremos Testing Avanzado, donde aprenderemos a escribir tests robustos para código asíncrono, mocking, property-based testing, y más.
Próximo capítulo: Testing Avanzado - Mocks, spies, property-based testing, y coverage.