← Volver al listado de tecnologías

Concurrencia en Zig

Por: Artiko
zigconcurrenciathreadsasync

Concurrencia

Threads Básicos

const std = @import("std");

fn trabajador(id: usize) void {
    std.debug.print("Thread {d} iniciado\n", .{id});

    // Simular trabajo
    var i: usize = 0;
    while (i < 5) : (i += 1) {
        std.debug.print("Thread {d}: iteración {d}\n", .{ id, i });
        std.time.sleep(100 * std.time.ns_per_ms);
    }

    std.debug.print("Thread {d} terminado\n", .{id});
}

pub fn main() !void {
    const num_threads = 4;
    var threads: [num_threads]std.Thread = undefined;

    // Crear threads
    for (0..num_threads) |i| {
        threads[i] = try std.Thread.spawn(.{}, trabajador, .{i});
    }

    // Esperar a que terminen
    for (threads) |t| {
        t.join();
    }

    std.debug.print("Todos los threads terminaron\n", .{});
}

Mutex

const std = @import("std");

const Contador = struct {
    valor: i32 = 0,
    mutex: std.Thread.Mutex = .{},

    pub fn incrementar(self: *Contador) void {
        self.mutex.lock();
        defer self.mutex.unlock();
        self.valor += 1;
    }

    pub fn obtener(self: *Contador) i32 {
        self.mutex.lock();
        defer self.mutex.unlock();
        return self.valor;
    }
};

fn trabajadorContador(contador: *Contador, iteraciones: usize) void {
    for (0..iteraciones) |_| {
        contador.incrementar();
    }
}

pub fn main() !void {
    var contador = Contador{};
    const num_threads = 4;
    const iteraciones = 10000;
    var threads: [num_threads]std.Thread = undefined;

    for (0..num_threads) |i| {
        threads[i] = try std.Thread.spawn(.{}, trabajadorContador, .{ &contador, iteraciones });
    }

    for (threads) |t| {
        t.join();
    }

    std.debug.print("Valor final: {d}\n", .{contador.obtener()});
    std.debug.print("Esperado: {d}\n", .{num_threads * iteraciones});
}

Atomic Operations

const std = @import("std");

const ContadorAtomico = struct {
    valor: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),

    pub fn incrementar(self: *ContadorAtomico) void {
        _ = self.valor.fetchAdd(1, .seq_cst);
    }

    pub fn obtener(self: *const ContadorAtomico) i32 {
        return self.valor.load(.seq_cst);
    }
};

fn trabajadorAtomico(contador: *ContadorAtomico, iteraciones: usize) void {
    for (0..iteraciones) |_| {
        contador.incrementar();
    }
}

pub fn main() !void {
    var contador = ContadorAtomico{};
    const num_threads = 4;
    const iteraciones = 10000;
    var threads: [num_threads]std.Thread = undefined;

    for (0..num_threads) |i| {
        threads[i] = try std.Thread.spawn(.{}, trabajadorAtomico, .{ &contador, iteraciones });
    }

    for (threads) |t| {
        t.join();
    }

    std.debug.print("Valor final: {d}\n", .{contador.obtener()});
}

Condition Variables

const std = @import("std");

const Cola = struct {
    items: std.ArrayList(i32),
    mutex: std.Thread.Mutex = .{},
    no_vacia: std.Thread.Condition = .{},
    terminado: bool = false,

    pub fn init(allocator: std.mem.Allocator) Cola {
        return .{
            .items = std.ArrayList(i32).init(allocator),
        };
    }

    pub fn deinit(self: *Cola) void {
        self.items.deinit();
    }

    pub fn push(self: *Cola, valor: i32) !void {
        self.mutex.lock();
        defer self.mutex.unlock();

        try self.items.append(valor);
        self.no_vacia.signal();
    }

    pub fn pop(self: *Cola) ?i32 {
        self.mutex.lock();
        defer self.mutex.unlock();

        while (self.items.items.len == 0 and !self.terminado) {
            self.no_vacia.wait(&self.mutex);
        }

        if (self.items.items.len == 0) return null;
        return self.items.orderedRemove(0);
    }

    pub fn terminar(self: *Cola) void {
        self.mutex.lock();
        defer self.mutex.unlock();
        self.terminado = true;
        self.no_vacia.broadcast();
    }
};

fn productor(cola: *Cola) void {
    for (0..10) |i| {
        cola.push(@intCast(i)) catch {};
        std.time.sleep(50 * std.time.ns_per_ms);
    }
    cola.terminar();
}

fn consumidor(cola: *Cola, id: usize) void {
    while (cola.pop()) |valor| {
        std.debug.print("Consumidor {d}: {d}\n", .{ id, valor });
    }
}

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    defer _ = gpa.deinit();

    var cola = Cola.init(gpa.allocator());
    defer cola.deinit();

    const prod = try std.Thread.spawn(.{}, productor, .{&cola});
    const cons1 = try std.Thread.spawn(.{}, consumidor, .{ &cola, 1 });
    const cons2 = try std.Thread.spawn(.{}, consumidor, .{ &cola, 2 });

    prod.join();
    cons1.join();
    cons2.join();
}

Thread Pool Simple

const std = @import("std");

const Tarea = struct {
    funcion: *const fn (*anyopaque) void,
    datos: *anyopaque,
};

const ThreadPool = struct {
    threads: []std.Thread,
    tareas: std.ArrayList(Tarea),
    mutex: std.Thread.Mutex = .{},
    condicion: std.Thread.Condition = .{},
    terminando: bool = false,
    allocator: std.mem.Allocator,

    pub fn init(allocator: std.mem.Allocator, num_threads: usize) !ThreadPool {
        var pool = ThreadPool{
            .threads = try allocator.alloc(std.Thread, num_threads),
            .tareas = std.ArrayList(Tarea).init(allocator),
            .allocator = allocator,
        };

        for (pool.threads) |*t| {
            t.* = try std.Thread.spawn(.{}, workerLoop, .{&pool});
        }

        return pool;
    }

    pub fn deinit(self: *ThreadPool) void {
        self.mutex.lock();
        self.terminando = true;
        self.condicion.broadcast();
        self.mutex.unlock();

        for (self.threads) |t| {
            t.join();
        }

        self.allocator.free(self.threads);
        self.tareas.deinit();
    }

    fn workerLoop(self: *ThreadPool) void {
        while (true) {
            self.mutex.lock();

            while (self.tareas.items.len == 0 and !self.terminando) {
                self.condicion.wait(&self.mutex);
            }

            if (self.terminando and self.tareas.items.len == 0) {
                self.mutex.unlock();
                return;
            }

            const tarea = self.tareas.orderedRemove(0);
            self.mutex.unlock();

            tarea.funcion(tarea.datos);
        }
    }

    pub fn submit(self: *ThreadPool, funcion: *const fn (*anyopaque) void, datos: *anyopaque) !void {
        self.mutex.lock();
        defer self.mutex.unlock();

        try self.tareas.append(.{ .funcion = funcion, .datos = datos });
        self.condicion.signal();
    }
};

Testing de Concurrencia

const std = @import("std");
const testing = std.testing;

const ContadorThread = struct {
    valor: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),

    pub fn incrementar(self: *ContadorThread) void {
        _ = self.valor.fetchAdd(1, .seq_cst);
    }

    pub fn decrementar(self: *ContadorThread) void {
        _ = self.valor.fetchSub(1, .seq_cst);
    }

    pub fn obtener(self: *const ContadorThread) i32 {
        return self.valor.load(.seq_cst);
    }
};

fn incrementarVeces(contador: *ContadorThread, veces: usize) void {
    for (0..veces) |_| {
        contador.incrementar();
    }
}

fn decrementarVeces(contador: *ContadorThread, veces: usize) void {
    for (0..veces) |_| {
        contador.decrementar();
    }
}

test "contador atómico básico" {
    var contador = ContadorThread{};

    contador.incrementar();
    contador.incrementar();
    contador.incrementar();

    try testing.expectEqual(@as(i32, 3), contador.obtener());

    contador.decrementar();
    try testing.expectEqual(@as(i32, 2), contador.obtener());
}

test "contador atómico multihilo" {
    var contador = ContadorThread{};
    const num_threads = 4;
    const iteraciones = 1000;
    var threads: [num_threads]std.Thread = undefined;

    for (0..num_threads) |i| {
        threads[i] = try std.Thread.spawn(.{}, incrementarVeces, .{ &contador, iteraciones });
    }

    for (threads) |t| {
        t.join();
    }

    try testing.expectEqual(@as(i32, @intCast(num_threads * iteraciones)), contador.obtener());
}

test "incrementar y decrementar concurrente" {
    var contador = ContadorThread{};
    const iteraciones = 1000;

    const t1 = try std.Thread.spawn(.{}, incrementarVeces, .{ &contador, iteraciones });
    const t2 = try std.Thread.spawn(.{}, decrementarVeces, .{ &contador, iteraciones });

    t1.join();
    t2.join();

    // Después de igual número de incrementos y decrementos, debe ser 0
    try testing.expectEqual(@as(i32, 0), contador.obtener());
}

test "mutex protege sección crítica" {
    const Datos = struct {
        valor: i32 = 0,
        mutex: std.Thread.Mutex = .{},

        fn incrementarSeguro(self: *@This()) void {
            self.mutex.lock();
            defer self.mutex.unlock();
            self.valor += 1;
        }
    };

    var datos = Datos{};
    const num_threads = 4;
    const iteraciones = 1000;
    var threads: [num_threads]std.Thread = undefined;

    for (0..num_threads) |i| {
        threads[i] = try std.Thread.spawn(.{}, struct {
            fn run(d: *Datos) void {
                for (0..iteraciones) |_| {
                    d.incrementarSeguro();
                }
            }
        }.run, .{&datos});
    }

    for (threads) |t| {
        t.join();
    }

    try testing.expectEqual(@as(i32, @intCast(num_threads * iteraciones)), datos.valor);
}

Ejercicios

  1. Implementa un pipeline productor-consumidor con múltiples etapas
  2. Crea un rate limiter thread-safe
  3. Implementa un cache concurrente con lecturas paralelas

Resumen