← Volver al listado de tecnologías
Concurrencia en Zig
Concurrencia
Threads Básicos
const std = @import("std");
fn trabajador(id: usize) void {
std.debug.print("Thread {d} iniciado\n", .{id});
// Simular trabajo
var i: usize = 0;
while (i < 5) : (i += 1) {
std.debug.print("Thread {d}: iteración {d}\n", .{ id, i });
std.time.sleep(100 * std.time.ns_per_ms);
}
std.debug.print("Thread {d} terminado\n", .{id});
}
pub fn main() !void {
const num_threads = 4;
var threads: [num_threads]std.Thread = undefined;
// Crear threads
for (0..num_threads) |i| {
threads[i] = try std.Thread.spawn(.{}, trabajador, .{i});
}
// Esperar a que terminen
for (threads) |t| {
t.join();
}
std.debug.print("Todos los threads terminaron\n", .{});
}
Mutex
const std = @import("std");
const Contador = struct {
valor: i32 = 0,
mutex: std.Thread.Mutex = .{},
pub fn incrementar(self: *Contador) void {
self.mutex.lock();
defer self.mutex.unlock();
self.valor += 1;
}
pub fn obtener(self: *Contador) i32 {
self.mutex.lock();
defer self.mutex.unlock();
return self.valor;
}
};
fn trabajadorContador(contador: *Contador, iteraciones: usize) void {
for (0..iteraciones) |_| {
contador.incrementar();
}
}
pub fn main() !void {
var contador = Contador{};
const num_threads = 4;
const iteraciones = 10000;
var threads: [num_threads]std.Thread = undefined;
for (0..num_threads) |i| {
threads[i] = try std.Thread.spawn(.{}, trabajadorContador, .{ &contador, iteraciones });
}
for (threads) |t| {
t.join();
}
std.debug.print("Valor final: {d}\n", .{contador.obtener()});
std.debug.print("Esperado: {d}\n", .{num_threads * iteraciones});
}
Atomic Operations
const std = @import("std");
const ContadorAtomico = struct {
valor: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),
pub fn incrementar(self: *ContadorAtomico) void {
_ = self.valor.fetchAdd(1, .seq_cst);
}
pub fn obtener(self: *const ContadorAtomico) i32 {
return self.valor.load(.seq_cst);
}
};
fn trabajadorAtomico(contador: *ContadorAtomico, iteraciones: usize) void {
for (0..iteraciones) |_| {
contador.incrementar();
}
}
pub fn main() !void {
var contador = ContadorAtomico{};
const num_threads = 4;
const iteraciones = 10000;
var threads: [num_threads]std.Thread = undefined;
for (0..num_threads) |i| {
threads[i] = try std.Thread.spawn(.{}, trabajadorAtomico, .{ &contador, iteraciones });
}
for (threads) |t| {
t.join();
}
std.debug.print("Valor final: {d}\n", .{contador.obtener()});
}
Condition Variables
const std = @import("std");
const Cola = struct {
items: std.ArrayList(i32),
mutex: std.Thread.Mutex = .{},
no_vacia: std.Thread.Condition = .{},
terminado: bool = false,
pub fn init(allocator: std.mem.Allocator) Cola {
return .{
.items = std.ArrayList(i32).init(allocator),
};
}
pub fn deinit(self: *Cola) void {
self.items.deinit();
}
pub fn push(self: *Cola, valor: i32) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.items.append(valor);
self.no_vacia.signal();
}
pub fn pop(self: *Cola) ?i32 {
self.mutex.lock();
defer self.mutex.unlock();
while (self.items.items.len == 0 and !self.terminado) {
self.no_vacia.wait(&self.mutex);
}
if (self.items.items.len == 0) return null;
return self.items.orderedRemove(0);
}
pub fn terminar(self: *Cola) void {
self.mutex.lock();
defer self.mutex.unlock();
self.terminado = true;
self.no_vacia.broadcast();
}
};
fn productor(cola: *Cola) void {
for (0..10) |i| {
cola.push(@intCast(i)) catch {};
std.time.sleep(50 * std.time.ns_per_ms);
}
cola.terminar();
}
fn consumidor(cola: *Cola, id: usize) void {
while (cola.pop()) |valor| {
std.debug.print("Consumidor {d}: {d}\n", .{ id, valor });
}
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
var cola = Cola.init(gpa.allocator());
defer cola.deinit();
const prod = try std.Thread.spawn(.{}, productor, .{&cola});
const cons1 = try std.Thread.spawn(.{}, consumidor, .{ &cola, 1 });
const cons2 = try std.Thread.spawn(.{}, consumidor, .{ &cola, 2 });
prod.join();
cons1.join();
cons2.join();
}
Thread Pool Simple
const std = @import("std");
const Tarea = struct {
funcion: *const fn (*anyopaque) void,
datos: *anyopaque,
};
const ThreadPool = struct {
threads: []std.Thread,
tareas: std.ArrayList(Tarea),
mutex: std.Thread.Mutex = .{},
condicion: std.Thread.Condition = .{},
terminando: bool = false,
allocator: std.mem.Allocator,
pub fn init(allocator: std.mem.Allocator, num_threads: usize) !ThreadPool {
var pool = ThreadPool{
.threads = try allocator.alloc(std.Thread, num_threads),
.tareas = std.ArrayList(Tarea).init(allocator),
.allocator = allocator,
};
for (pool.threads) |*t| {
t.* = try std.Thread.spawn(.{}, workerLoop, .{&pool});
}
return pool;
}
pub fn deinit(self: *ThreadPool) void {
self.mutex.lock();
self.terminando = true;
self.condicion.broadcast();
self.mutex.unlock();
for (self.threads) |t| {
t.join();
}
self.allocator.free(self.threads);
self.tareas.deinit();
}
fn workerLoop(self: *ThreadPool) void {
while (true) {
self.mutex.lock();
while (self.tareas.items.len == 0 and !self.terminando) {
self.condicion.wait(&self.mutex);
}
if (self.terminando and self.tareas.items.len == 0) {
self.mutex.unlock();
return;
}
const tarea = self.tareas.orderedRemove(0);
self.mutex.unlock();
tarea.funcion(tarea.datos);
}
}
pub fn submit(self: *ThreadPool, funcion: *const fn (*anyopaque) void, datos: *anyopaque) !void {
self.mutex.lock();
defer self.mutex.unlock();
try self.tareas.append(.{ .funcion = funcion, .datos = datos });
self.condicion.signal();
}
};
Testing de Concurrencia
const std = @import("std");
const testing = std.testing;
const ContadorThread = struct {
valor: std.atomic.Value(i32) = std.atomic.Value(i32).init(0),
pub fn incrementar(self: *ContadorThread) void {
_ = self.valor.fetchAdd(1, .seq_cst);
}
pub fn decrementar(self: *ContadorThread) void {
_ = self.valor.fetchSub(1, .seq_cst);
}
pub fn obtener(self: *const ContadorThread) i32 {
return self.valor.load(.seq_cst);
}
};
fn incrementarVeces(contador: *ContadorThread, veces: usize) void {
for (0..veces) |_| {
contador.incrementar();
}
}
fn decrementarVeces(contador: *ContadorThread, veces: usize) void {
for (0..veces) |_| {
contador.decrementar();
}
}
test "contador atómico básico" {
var contador = ContadorThread{};
contador.incrementar();
contador.incrementar();
contador.incrementar();
try testing.expectEqual(@as(i32, 3), contador.obtener());
contador.decrementar();
try testing.expectEqual(@as(i32, 2), contador.obtener());
}
test "contador atómico multihilo" {
var contador = ContadorThread{};
const num_threads = 4;
const iteraciones = 1000;
var threads: [num_threads]std.Thread = undefined;
for (0..num_threads) |i| {
threads[i] = try std.Thread.spawn(.{}, incrementarVeces, .{ &contador, iteraciones });
}
for (threads) |t| {
t.join();
}
try testing.expectEqual(@as(i32, @intCast(num_threads * iteraciones)), contador.obtener());
}
test "incrementar y decrementar concurrente" {
var contador = ContadorThread{};
const iteraciones = 1000;
const t1 = try std.Thread.spawn(.{}, incrementarVeces, .{ &contador, iteraciones });
const t2 = try std.Thread.spawn(.{}, decrementarVeces, .{ &contador, iteraciones });
t1.join();
t2.join();
// Después de igual número de incrementos y decrementos, debe ser 0
try testing.expectEqual(@as(i32, 0), contador.obtener());
}
test "mutex protege sección crítica" {
const Datos = struct {
valor: i32 = 0,
mutex: std.Thread.Mutex = .{},
fn incrementarSeguro(self: *@This()) void {
self.mutex.lock();
defer self.mutex.unlock();
self.valor += 1;
}
};
var datos = Datos{};
const num_threads = 4;
const iteraciones = 1000;
var threads: [num_threads]std.Thread = undefined;
for (0..num_threads) |i| {
threads[i] = try std.Thread.spawn(.{}, struct {
fn run(d: *Datos) void {
for (0..iteraciones) |_| {
d.incrementarSeguro();
}
}
}.run, .{&datos});
}
for (threads) |t| {
t.join();
}
try testing.expectEqual(@as(i32, @intCast(num_threads * iteraciones)), datos.valor);
}
Ejercicios
- Implementa un pipeline productor-consumidor con múltiples etapas
- Crea un rate limiter thread-safe
- Implementa un cache concurrente con lecturas paralelas
Resumen
std.Thread.spawncrea nuevos threadsstd.Thread.Mutexprotege secciones críticasstd.atomic.Valuepara operaciones atómicasstd.Thread.Conditionpara sincronización avanzadadeferfacilita el unlock automático