← Volver al listado de tecnologías

Capítulo 16: Programación Asíncrona

Por: Artiko
luaasyncevent-loopcoroutines

Capítulo 16: Programación Asíncrona

La programación asíncrona en Lua se construye sobre las coroutines, permitiendo crear aplicaciones concurrentes sin la complejidad de threads o callbacks anidados. En este capítulo exploraremos cómo implementar event loops, patrones async comunes, y sistemas de alta concurrencia.

1. Event Loop Básico con Coroutines

Un event loop gestiona tareas asíncronas ejecutándolas cooperativamente:

-- Event loop básico
local EventLoop = {}
EventLoop.__index = EventLoop

function EventLoop.new()
    local self = setmetatable({}, EventLoop)
    self.tasks = {}
    self.timers = {}
    self.running = false
    return self
end

function EventLoop:spawn(fn, ...)
    local args = {...}
    local co = coroutine.create(function()
        fn(table.unpack(args))
    end)

    table.insert(self.tasks, {
        coroutine = co,
        status = "ready"
    })

    return co
end

function EventLoop:sleep(seconds)
    local co = coroutine.running()
    local wake_time = os.clock() + seconds

    table.insert(self.timers, {
        coroutine = co,
        wake_time = wake_time
    })

    coroutine.yield()
end

function EventLoop:run()
    self.running = true

    while self.running and (#self.tasks > 0 or #self.timers > 0) do
        -- Procesar tareas listas
        local i = 1
        while i <= #self.tasks do
            local task = self.tasks[i]

            if task.status == "ready" then
                local success, err = coroutine.resume(task.coroutine)

                if not success then
                    print("Error en tarea:", err)
                    table.remove(self.tasks, i)
                elseif coroutine.status(task.coroutine) == "dead" then
                    table.remove(self.tasks, i)
                else
                    i = i + 1
                end
            else
                i = i + 1
            end
        end

        -- Procesar timers
        local current_time = os.clock()
        local i = 1
        while i <= #self.timers do
            local timer = self.timers[i]

            if current_time >= timer.wake_time then
                -- Reactivar la coroutine
                local task_found = false
                for j, task in ipairs(self.tasks) do
                    if task.coroutine == timer.coroutine then
                        task.status = "ready"
                        task_found = true
                        break
                    end
                end

                if not task_found then
                    table.insert(self.tasks, {
                        coroutine = timer.coroutine,
                        status = "ready"
                    })
                end

                table.remove(self.timers, i)
            else
                i = i + 1
            end
        end

        -- Pequeña pausa para no saturar CPU
        if #self.tasks == 0 and #self.timers > 0 then
            -- Calcular cuánto esperar hasta el próximo timer
            local min_wait = math.huge
            for _, timer in ipairs(self.timers) do
                local wait = timer.wake_time - os.clock()
                if wait < min_wait then
                    min_wait = wait
                end
            end

            if min_wait > 0 then
                -- En un sistema real, usaríamos select() o similar
                os.execute("sleep " .. math.max(0.001, min_wait))
            end
        end
    end
end

function EventLoop:stop()
    self.running = false
end

-- Ejemplo de uso
local loop = EventLoop.new()

loop:spawn(function()
    for i = 1, 3 do
        print("Tarea 1 - iteración", i)
        loop:sleep(1)
    end
end)

loop:spawn(function()
    for i = 1, 5 do
        print("Tarea 2 - iteración", i)
        loop:sleep(0.5)
    end
end)

print("Iniciando event loop...")
loop:run()
print("Event loop finalizado")

Event Loop con Canales

Implementemos canales para comunicación entre coroutines:

-- Sistema de canales
local Channel = {}
Channel.__index = Channel

function Channel.new(buffer_size)
    local self = setmetatable({}, Channel)
    self.buffer = {}
    self.buffer_size = buffer_size or 0
    self.waiting_senders = {}
    self.waiting_receivers = {}
    return self
end

function Channel:send(value, loop)
    -- Si hay receptores esperando, entregar directamente
    if #self.waiting_receivers > 0 then
        local receiver = table.remove(self.waiting_receivers, 1)
        receiver.value = value
        receiver.status = "ready"
        return
    end

    -- Si hay espacio en el buffer, almacenar
    if #self.buffer < self.buffer_size then
        table.insert(self.buffer, value)
        return
    end

    -- De lo contrario, esperar
    local co = coroutine.running()
    table.insert(self.waiting_senders, {
        coroutine = co,
        value = value
    })
    coroutine.yield()
end

function Channel:receive(loop)
    -- Si hay datos en el buffer, tomar uno
    if #self.buffer > 0 then
        local value = table.remove(self.buffer, 1)

        -- Si hay senders esperando, mover uno al buffer
        if #self.waiting_senders > 0 then
            local sender = table.remove(self.waiting_senders, 1)
            table.insert(self.buffer, sender.value)
            -- Reactivar el sender
            for _, task in ipairs(loop.tasks) do
                if task.coroutine == sender.coroutine then
                    task.status = "ready"
                    break
                end
            end
        end

        return value
    end

    -- Si hay senders esperando, tomar directamente
    if #self.waiting_senders > 0 then
        local sender = table.remove(self.waiting_senders, 1)
        -- Reactivar el sender
        for _, task in ipairs(loop.tasks) do
            if task.coroutine == sender.coroutine then
                task.status = "ready"
                break
            end
        end
        return sender.value
    end

    -- De lo contrario, esperar
    local co = coroutine.running()
    local receiver = {
        coroutine = co,
        value = nil,
        status = "waiting"
    }
    table.insert(self.waiting_receivers, receiver)
    table.insert(loop.tasks, {
        coroutine = co,
        status = "waiting"
    })
    coroutine.yield()
    return receiver.value
end

-- Ejemplo con canales
local loop = EventLoop.new()
local ch = Channel.new(2)

loop:spawn(function()
    for i = 1, 5 do
        print("Enviando:", i)
        ch:send(i, loop)
        loop:sleep(0.3)
    end
    ch:send(nil, loop) -- Señal de fin
end)

loop:spawn(function()
    while true do
        local value = ch:receive(loop)
        if value == nil then break end
        print("Recibido:", value)
        loop:sleep(0.7)
    end
end)

loop:run()

2. Patrón Producer-Consumer

Implementemos el patrón clásico producer-consumer:

-- Queue thread-safe para producer-consumer
local Queue = {}
Queue.__index = Queue

function Queue.new()
    local self = setmetatable({}, Queue)
    self.items = {}
    self.waiting = {}
    return self
end

function Queue:enqueue(item)
    table.insert(self.items, item)

    -- Despertar un consumidor si está esperando
    if #self.waiting > 0 then
        local waiter = table.remove(self.waiting, 1)
        coroutine.resume(waiter.coroutine, table.remove(self.items, 1))
    end
end

function Queue:dequeue()
    if #self.items > 0 then
        return table.remove(self.items, 1)
    end

    -- Esperar hasta que haya un item
    local co = coroutine.running()
    table.insert(self.waiting, {coroutine = co})
    return coroutine.yield()
end

-- Producer-Consumer con múltiples workers
local ProducerConsumer = {}
ProducerConsumer.__index = ProducerConsumer

function ProducerConsumer.new(loop, num_workers)
    local self = setmetatable({}, ProducerConsumer)
    self.loop = loop
    self.queue = Queue.new()
    self.num_workers = num_workers or 3
    self.workers = {}
    self.results = {}
    return self
end

function ProducerConsumer:start_workers(process_fn)
    for i = 1, self.num_workers do
        local worker = self.loop:spawn(function()
            while true do
                local task = self.queue:dequeue()
                if task == nil then break end -- Señal de parada

                local result = process_fn(task, i)
                table.insert(self.results, result)
            end
            print(string.format("Worker %d finalizado", i))
        end)
        table.insert(self.workers, worker)
    end
end

function ProducerConsumer:submit(task)
    self.queue:enqueue(task)
end

function ProducerConsumer:stop()
    -- Enviar señal de parada a cada worker
    for i = 1, self.num_workers do
        self.queue:enqueue(nil)
    end
end

-- Ejemplo de uso
local loop = EventLoop.new()
local pc = ProducerConsumer.new(loop, 3)

-- Función de procesamiento
local function process_task(task, worker_id)
    print(string.format("Worker %d procesando: %s", worker_id, task.name))
    loop:sleep(task.duration)
    return {
        task = task.name,
        worker = worker_id,
        result = task.value * 2
    }
end

pc:start_workers(process_task)

-- Producer
loop:spawn(function()
    local tasks = {
        {name = "Task1", value = 10, duration = 1},
        {name = "Task2", value = 20, duration = 0.5},
        {name = "Task3", value = 30, duration = 1.5},
        {name = "Task4", value = 40, duration = 0.8},
        {name = "Task5", value = 50, duration = 1.2},
    }

    for _, task in ipairs(tasks) do
        pc:submit(task)
        loop:sleep(0.2)
    end

    pc:stop()
end)

loop:run()

print("\nResultados:")
for _, result in ipairs(pc.results) do
    print(string.format("%s -> %d (worker %d)",
        result.task, result.result, result.worker))
end

3. Pipelines de Datos Asincrónicos

Creemos pipelines para procesar datos en etapas:

-- Pipeline asíncrono
local Pipeline = {}
Pipeline.__index = Pipeline

function Pipeline.new(loop)
    local self = setmetatable({}, Pipeline)
    self.loop = loop
    self.stages = {}
    return self
end

function Pipeline:add_stage(name, fn, workers)
    local stage = {
        name = name,
        fn = fn,
        workers = workers or 1,
        input = Channel.new(10),
        output = Channel.new(10)
    }
    table.insert(self.stages, stage)
    return self
end

function Pipeline:start()
    for i, stage in ipairs(self.stages) do
        local input_ch = stage.input
        local output_ch = stage.output

        -- Si no es la última etapa, conectar con la siguiente
        if i < #self.stages then
            output_ch = self.stages[i + 1].input
        end

        -- Crear workers para esta etapa
        for w = 1, stage.workers do
            self.loop:spawn(function()
                while true do
                    local data = input_ch:receive(self.loop)
                    if data == nil then
                        -- Propagar señal de fin
                        if output_ch ~= stage.output then
                            output_ch:send(nil, self.loop)
                        end
                        break
                    end

                    local result = stage.fn(data, w)
                    if result ~= nil then
                        if output_ch == stage.output then
                            table.insert(stage.output.buffer, result)
                        else
                            output_ch:send(result, self.loop)
                        end
                    end
                end
                print(string.format("Stage '%s' worker %d finalizado", stage.name, w))
            end)
        end
    end
end

function Pipeline:send(data)
    if #self.stages > 0 then
        self.stages[1].input:send(data, self.loop)
    end
end

function Pipeline:close()
    if #self.stages > 0 then
        self.stages[1].input:send(nil, self.loop)
    end
end

function Pipeline:get_results()
    if #self.stages > 0 then
        return self.stages[#self.stages].output.buffer
    end
    return {}
end

-- Ejemplo: Pipeline de procesamiento de datos
local loop = EventLoop.new()
local pipe = Pipeline.new(loop)

pipe:add_stage("parse", function(data, worker)
    print(string.format("[parse:%d] %s", worker, data))
    loop:sleep(0.1)
    return tonumber(data)
end, 2)

pipe:add_stage("transform", function(data, worker)
    print(string.format("[transform:%d] %d", worker, data))
    loop:sleep(0.15)
    return data * data
end, 2)

pipe:add_stage("format", function(data, worker)
    print(string.format("[format:%d] %d", worker, data))
    loop:sleep(0.1)
    return string.format("resultado: %d", data)
end, 1)

pipe:start()

-- Enviar datos al pipeline
loop:spawn(function()
    for i = 1, 5 do
        pipe:send(tostring(i))
        loop:sleep(0.05)
    end
    pipe:close()
end)

loop:run()

print("\nResultados finales:")
for _, result in ipairs(pipe:get_results()) do
    print(result)
end

4. Timeout y Cancelación

Implementemos mecanismos de timeout y cancelación:

-- Sistema de timeout
local Timeout = {}
Timeout.__index = Timeout

function Timeout.new(loop, seconds)
    local self = setmetatable({}, Timeout)
    self.loop = loop
    self.seconds = seconds
    self.cancelled = false
    return self
end

function Timeout:run(fn, ...)
    local args = {...}
    local result = nil
    local completed = false
    local timed_out = false

    -- Coroutine para la tarea
    local task_co = self.loop:spawn(function()
        result = fn(table.unpack(args))
        completed = true
    end)

    -- Coroutine para el timeout
    self.loop:spawn(function()
        self.loop:sleep(self.seconds)
        if not completed and not self.cancelled then
            timed_out = true
            print("Timeout alcanzado")
        end
    end)

    return function()
        return completed, timed_out, result
    end
end

function Timeout:cancel()
    self.cancelled = true
end

-- Context con cancelación
local Context = {}
Context.__index = Context

function Context.new(loop)
    local self = setmetatable({}, Context)
    self.loop = loop
    self.cancelled = false
    self.cancel_callbacks = {}
    return self
end

function Context:is_cancelled()
    return self.cancelled
end

function Context:cancel()
    if self.cancelled then return end
    self.cancelled = true

    for _, callback in ipairs(self.cancel_callbacks) do
        callback()
    end
end

function Context:on_cancel(callback)
    table.insert(self.cancel_callbacks, callback)
end

function Context:with_timeout(seconds)
    local timeout_ctx = Context.new(self.loop)

    -- Heredar cancelación del padre
    self:on_cancel(function()
        timeout_ctx:cancel()
    end)

    -- Configurar timeout
    self.loop:spawn(function()
        self.loop:sleep(seconds)
        timeout_ctx:cancel()
    end)

    return timeout_ctx
end

-- Ejemplo de uso
local loop = EventLoop.new()

-- Tarea con timeout
local function long_running_task(ctx)
    for i = 1, 10 do
        if ctx:is_cancelled() then
            print("Tarea cancelada en iteración", i)
            return nil
        end
        print("Trabajando... iteración", i)
        loop:sleep(0.5)
    end
    return "completado"
end

local ctx = Context.new(loop)
local timeout_ctx = ctx:with_timeout(3)

loop:spawn(function()
    local result = long_running_task(timeout_ctx)
    if result then
        print("Resultado:", result)
    else
        print("Tarea no completada")
    end
end)

loop:run()

5. Manejo de Errores en Código Async

Estrategias para manejar errores en código asíncrono:

-- Future/Promise para manejo de errores
local Future = {}
Future.__index = Future

function Future.new(loop)
    local self = setmetatable({}, Future)
    self.loop = loop
    self.completed = false
    self.success = false
    self.value = nil
    self.error = nil
    self.callbacks = {}
    return self
end

function Future:resolve(value)
    if self.completed then return end
    self.completed = true
    self.success = true
    self.value = value

    for _, callback in ipairs(self.callbacks) do
        if callback.on_success then
            callback.on_success(value)
        end
    end
end

function Future:reject(error)
    if self.completed then return end
    self.completed = true
    self.success = false
    self.error = error

    for _, callback in ipairs(self.callbacks) do
        if callback.on_error then
            callback.on_error(error)
        end
    end
end

function Future:then_do(on_success, on_error)
    local next_future = Future.new(self.loop)

    local callback = {
        on_success = function(value)
            if on_success then
                local success, result = pcall(on_success, value)
                if success then
                    next_future:resolve(result)
                else
                    next_future:reject(result)
                end
            else
                next_future:resolve(value)
            end
        end,
        on_error = function(err)
            if on_error then
                local success, result = pcall(on_error, err)
                if success then
                    next_future:resolve(result)
                else
                    next_future:reject(result)
                end
            else
                next_future:reject(err)
            end
        end
    }

    if self.completed then
        if self.success then
            callback.on_success(self.value)
        else
            callback.on_error(self.error)
        end
    else
        table.insert(self.callbacks, callback)
    end

    return next_future
end

function Future:catch_error(on_error)
    return self:then_do(nil, on_error)
end

-- Función async que retorna Future
local function fetch_data(loop, url, delay)
    local future = Future.new(loop)

    loop:spawn(function()
        loop:sleep(delay)

        -- Simular error random
        if math.random() > 0.7 then
            future:reject("Error de red: " .. url)
        else
            future:resolve({url = url, data = "Datos de " .. url})
        end
    end)

    return future
end

-- Ejemplo de uso
local loop = EventLoop.new()

loop:spawn(function()
    fetch_data(loop, "https://api.example.com/users", 1)
        :then_do(function(response)
            print("Éxito:", response.data)
            return response.data
        end)
        :then_do(function(data)
            print("Procesando:", data)
            return string.upper(data)
        end)
        :then_do(function(result)
            print("Resultado final:", result)
        end)
        :catch_error(function(err)
            print("Error capturado:", err)
        end)
end)

loop:run()

6. DEEP DIVE: Event Loop Internals

Analicemos cómo funciona un event loop real:

--[[
ESTRUCTURA INTERNA DE UN EVENT LOOP

1. Task Queue:
   - Cola de tareas listas para ejecutar
   - Cada tarea es una coroutine con estado
   - Estados: ready, waiting, blocked, dead

2. Timer Wheel:
   - Estructura de datos para timers eficientes
   - Organiza timers en "buckets" por tiempo
   - O(1) para inserción, O(1) amortizado para tick

3. I/O Selector:
   - En sistemas reales: epoll (Linux), kqueue (BSD), IOCP (Windows)
   - Notifica cuando un descriptor está listo
   - Integrado con el event loop principal

4. Microtask Queue:
   - Para callbacks de alta prioridad
   - Se ejecutan antes de tareas normales
   - Evita starvation con límite de ejecución
]]

-- Event Loop avanzado con microtasks
local AdvancedEventLoop = {}
AdvancedEventLoop.__index = AdvancedEventLoop

function AdvancedEventLoop.new()
    local self = setmetatable({}, AdvancedEventLoop)
    self.tasks = {}
    self.microtasks = {}
    self.timers = {}
    self.io_watchers = {}
    self.running = false
    self.tick_count = 0
    return self
end

function AdvancedEventLoop:queue_microtask(fn)
    table.insert(self.microtasks, {
        fn = fn,
        created_at = self.tick_count
    })
end

function AdvancedEventLoop:process_microtasks()
    local count = 0
    local max_microtasks = 100 -- Prevenir starvation

    while #self.microtasks > 0 and count < max_microtasks do
        local microtask = table.remove(self.microtasks, 1)
        local success, err = pcall(microtask.fn)
        if not success then
            print("Error en microtask:", err)
        end
        count = count + 1
    end

    if #self.microtasks > 0 then
        print(string.format("Advertencia: %d microtasks pendientes", #self.microtasks))
    end
end

function AdvancedEventLoop:tick()
    self.tick_count = self.tick_count + 1

    -- 1. Procesar microtasks primero
    self:process_microtasks()

    -- 2. Procesar tareas normales
    local processed = 0
    local i = 1
    while i <= #self.tasks and processed < 10 do
        local task = self.tasks[i]
        if task.status == "ready" then
            local success, err = coroutine.resume(task.coroutine)
            if not success then
                print("Error en tarea:", err)
                table.remove(self.tasks, i)
            elseif coroutine.status(task.coroutine) == "dead" then
                table.remove(self.tasks, i)
            else
                i = i + 1
            end
            processed = processed + 1
        else
            i = i + 1
        end
    end

    -- 3. Procesar timers
    local current_time = os.clock()
    for i = #self.timers, 1, -1 do
        local timer = self.timers[i]
        if current_time >= timer.wake_time then
            self:queue_microtask(function()
                for _, task in ipairs(self.tasks) do
                    if task.coroutine == timer.coroutine then
                        task.status = "ready"
                        break
                    end
                end
            end)
            table.remove(self.timers, i)
        end
    end
end

-- Métricas del event loop
function AdvancedEventLoop:get_metrics()
    return {
        tick_count = self.tick_count,
        tasks = #self.tasks,
        microtasks = #self.microtasks,
        timers = #self.timers,
        io_watchers = #self.io_watchers
    }
end

print([[
OPTIMIZACIONES COMUNES:

1. Timer Wheel: O(1) scheduling
   - Array circular de buckets
   - Cada bucket contiene timers para ese intervalo
   - Puntero avanza cada tick

2. I/O Multiplexing:
   - select(): Portable pero limitado (1024 fds)
   - epoll(): Linux, eficiente para muchos fds
   - kqueue(): BSD/macOS, edge-triggered
   - IOCP(): Windows, completion-based

3. Task Scheduling:
   - Round-robin para fairness
   - Priority queues para prioridades
   - Work stealing para balance de carga

4. Memory Management:
   - Pool de coroutines reutilizables
   - Buffer pools para I/O
   - Weak tables para caches
]])

7. SOAPBOX: Lua Coroutines vs Node.js

Mi opinión sobre diferentes modelos de concurrencia:

print([[
═══════════════════════════════════════════════════════════════
  SOAPBOX: Lua Coroutines vs Node.js Event Loop
═══════════════════════════════════════════════════════════════

Lua Coroutines:
✓ Control explícito de concurrencia
✓ Sin callback hell
✓ Stack completo por coroutine
✓ Debugging más simple
✗ Require diseño cuidadoso
✗ Sin ecosistema async masivo

Node.js Event Loop:
✓ Ecosistema masivo (npm)
✓ Async/await nativo
✓ I/O no bloqueante por defecto
✗ Callback hell (pre-async/await)
✗ Stack traces fragmentados
✗ Menos control sobre scheduling

VEREDICTO:
Lua coroutines ofrecen un modelo MÁS SIMPLE y PREDECIBLE que
callbacks o promesas. Con async/await, la brecha se reduce,
pero Lua sigue ganando en:

1. Simplicidad conceptual
2. Control fino de ejecución
3. Debugging más claro
4. Menor overhead de memoria

Node.js gana en ecosistema y herramientas, pero si buscas
entender concurrencia profundamente, Lua es mejor profesor.

La belleza de Lua es que puedes construir CUALQUIER modelo
de concurrencia sobre coroutines: actors, CSP, futures,
streams... Es un lienzo en blanco.
═══════════════════════════════════════════════════════════════
]])

-- Comparación de código
print("\n-- CALLBACK HELL (Node.js style):")
print([[
fetchUser(userId, function(err, user) {
    if (err) return handleError(err);
    fetchPosts(user.id, function(err, posts) {
        if (err) return handleError(err);
        fetchComments(posts[0].id, function(err, comments) {
            if (err) return handleError(err);
            render(user, posts, comments);
        });
    });
});
]])

print("\n-- LUA COROUTINES (limpio y secuencial):")
print([[
local user = fetchUser(userId)
local posts = fetchPosts(user.id)
local comments = fetchComments(posts[1].id)
render(user, posts, comments)
]])

print("\n-- ASYNC/AWAIT (Node.js moderno):")
print([[
try {
    const user = await fetchUser(userId);
    const posts = await fetchPosts(user.id);
    const comments = await fetchComments(posts[0].id);
    render(user, posts, comments);
} catch (err) {
    handleError(err);
}
]])

print("\n¡Lua lo tenía resuelto desde los 90s!")

8. Caso Práctico: Web Scraper Asíncrono

Implementemos un web scraper simple usando nuestro event loop:

-- Web scraper asíncrono (simulado)
local WebScraper = {}
WebScraper.__index = WebScraper

function WebScraper.new(loop, max_concurrent)
    local self = setmetatable({}, WebScraper)
    self.loop = loop
    self.max_concurrent = max_concurrent or 3
    self.pending = {}
    self.results = {}
    self.active = 0
    return self
end

function WebScraper:fetch_url(url)
    -- Simular HTTP request
    local future = Future.new(self.loop)

    self.loop:spawn(function()
        local delay = math.random() * 2 + 0.5
        self.loop:sleep(delay)

        if math.random() > 0.1 then
            future:resolve({
                url = url,
                status = 200,
                body = string.format("Contenido de %s", url),
                links = self:extract_links(url)
            })
        else
            future:reject("Error 404: " .. url)
        end
    end)

    return future
end

function WebScraper:extract_links(url)
    -- Simular extracción de links
    local num_links = math.random(2, 5)
    local links = {}
    for i = 1, num_links do
        table.insert(links, string.format("%s/page%d", url, i))
    end
    return links
end

function WebScraper:crawl(start_url, max_depth)
    local visited = {}
    local queue = {{url = start_url, depth = 0}}

    local function process_next()
        if #queue == 0 then return end

        local item = table.remove(queue, 1)
        if visited[item.url] or item.depth > max_depth then
            process_next()
            return
        end

        visited[item.url] = true
        self.active = self.active + 1

        print(string.format("Fetching [depth=%d]: %s", item.depth, item.url))

        self:fetch_url(item.url)
            :then_do(function(response)
                table.insert(self.results, {
                    url = item.url,
                    depth = item.depth,
                    status = response.status
                })

                if item.depth < max_depth then
                    for _, link in ipairs(response.links) do
                        if not visited[link] then
                            table.insert(queue, {url = link, depth = item.depth + 1})
                        end
                    end
                end

                self.active = self.active - 1
                if self.active < self.max_concurrent then
                    process_next()
                end
            end)
            :catch_error(function(err)
                print("Error:", err)
                self.active = self.active - 1
                process_next()
            end)
    end

    -- Iniciar crawlers concurrentes
    for i = 1, self.max_concurrent do
        self.loop:spawn(function()
            process_next()
        end)
    end
end

-- Ejemplo de uso
local loop = AdvancedEventLoop.new()
local scraper = WebScraper.new(loop, 3)

scraper:crawl("https://example.com", 2)

-- Correr hasta que termine
loop:spawn(function()
    while scraper.active > 0 or #scraper.pending > 0 do
        loop:sleep(0.1)
    end
    loop:stop()
end)

loop:run()

print("\n=== RESULTADOS DEL SCRAPING ===")
for _, result in ipairs(scraper.results) do
    print(string.format("[depth=%d] %s -> %d",
        result.depth, result.url, result.status))
end

9. Ejercicios

Ejercicio 1: Rate Limiter Asíncrono

Implementa un rate limiter que controle cuántas operaciones se pueden ejecutar por segundo:

--[[
Crea un RateLimiter que:
1. Limite operaciones a N por segundo
2. Encole operaciones excedentes
3. Use token bucket o leaky bucket algorithm
4. Soporte múltiples clientes con diferentes límites

Ejemplo:
local limiter = RateLimiter.new(loop, 5) -- 5 ops/sec
for i = 1, 20 do
    limiter:execute(function()
        print("Operación", i)
    end)
end

BONUS: Implementa burst capacity (ráfagas)
]]

Ejercicio 2: Async Pool con Retry Logic

Crea un pool de workers con reintentos automáticos:

--[[
Implementa AsyncPool con:
1. Pool de N workers
2. Retry automático con backoff exponencial
3. Circuit breaker para fallos continuos
4. Métricas: éxitos, fallos, reintentos

Ejemplo:
local pool = AsyncPool.new(loop, {
    workers = 3,
    max_retries = 3,
    backoff_base = 2
})

pool:submit(function()
    -- Tarea que puede fallar
    if math.random() > 0.7 then
        error("Fallo temporal")
    end
    return "éxito"
end)

print(pool:get_metrics())
]]

Ejercicio 3: Reactive Stream

Implementa un sistema de streams reactivos estilo RxJS:

--[[
Crea Observable con operadores:
1. map, filter, reduce
2. debounce, throttle
3. merge, concat, zip
4. error handling y retry

Ejemplo:
local clicks = Observable.from_events(button, "click")
clicks
    :debounce(300)
    :map(function(e) return e.x end)
    :filter(function(x) return x > 100 end)
    :subscribe(function(x)
        print("Click válido en x:", x)
    end)

BONUS: Implementa hot vs cold observables
]]

Recursos Adicionales

Conclusión

La programación asíncrona en Lua es elegante y poderosa gracias a las coroutines. Hemos visto cómo construir event loops, implementar patrones async comunes, y crear sistemas concurrentes sin la complejidad de threads o callbacks anidados.

En el próximo capítulo exploraremos Testing Avanzado, donde aprenderemos a escribir tests robustos para código asíncrono, mocking, property-based testing, y más.


Próximo capítulo: Testing Avanzado - Mocks, spies, property-based testing, y coverage.