diff --git a/Makefile b/Makefile index a37a778..f0ac286 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,7 @@ tb: .PHONY: s s: - zig build run -freference-trace + zig build example_1 -freference-trace .phony: d d: diff --git a/src/thread_pool.zig b/src/thread_pool.zig index a6e6a58..b1730ba 100644 --- a/src/thread_pool.zig +++ b/src/thread_pool.zig @@ -40,16 +40,20 @@ pub fn ThreadPool(comptime F: anytype) type { }); return struct { - push: usize, - pull: usize, - stopped: bool, - pending: usize, + // position in queue to read from + tail: usize, + + // position in the queue to write to + head: usize, + + // pendind jobs queue: []Args, + + stopped: bool, threads: []Thread, mutex: Thread.Mutex, - pull_cond: Thread.Condition, - push_cond: Thread.Condition, - queue_end: usize, + read_cond: Thread.Condition, + write_cond: Thread.Condition, const Self = @This(); @@ -60,29 +64,26 @@ pub fn ThreadPool(comptime F: anytype) type { const thread_pool = try allocator.create(Self); thread_pool.* = .{ - .pull = 0, - .push = 0, - .pending = 0, + .tail = 0, + .head = 0, .mutex = .{}, .stopped = false, .queue = queue, - .pull_cond = .{}, - .push_cond = .{}, + .read_cond = .{}, + .write_cond = .{}, .threads = threads, - .queue_end = queue.len - 1, }; var started: usize = 0; errdefer { thread_pool.stopped = true; - thread_pool.pull_cond.broadcast(); + thread_pool.read_cond.broadcast(); for (0..started) |i| { threads[i].join(); } } for (0..threads.len) |i| { - // This becomes owned by the thread, it'll free it as it ends const buffer = try allocator.alloc(u8, opts.buffer_size); threads[i] = try Thread.spawn(.{}, Self.worker, .{ thread_pool, buffer }); started += 1; @@ -96,7 +97,7 @@ pub fn ThreadPool(comptime F: anytype) type { self.stopped = true; self.mutex.unlock(); - self.pull_cond.broadcast(); + self.read_cond.broadcast(); for (self.threads) |thrd| { thrd.join(); } @@ -105,51 +106,61 @@ pub fn ThreadPool(comptime F: anytype) type { pub fn empty(self: *Self) bool { self.mutex.lock(); defer self.mutex.unlock(); - return self.pull == self.push; + return self.head == self.tail; } pub fn spawn(self: *Self, args: Args) void { const queue = self.queue; - const len = queue.len; + const queue_end = queue.len - 1; self.mutex.lock(); - while (self.pending == len) { - self.push_cond.wait(&self.mutex); + while (self.isFull(queue_end)) { + self.write_cond.wait(&self.mutex); } - const push = self.push; - self.queue[push] = args; - self.push = if (push == self.queue_end) 0 else push + 1; - self.pending += 1; + const head = self.head; + queue[head] = args; + self.head = if (head == queue_end) 0 else head + 1; self.mutex.unlock(); - self.pull_cond.signal(); + self.read_cond.signal(); + } + + // assumed to be called under lock + inline fn isFull(self: *Self, queue_end: usize) bool { + const tail = self.tail; + const head = self.head; + if (tail == 0) { + return head == queue_end; + } + return head == tail - 1; } + // Having a re-usable buffer per thread is the most efficient way + // we can do any dynamic allocations. We'll pair this later with + // a FallbackAllocator. The main issue is that some data must outlive + // the worker thread (in nonblocking mode), but this isn't something + // we need to worry about here. As far as this worker thread is + // concerned, it has a chunk of memory (buffer) which it'll pass + // to the callback function to do with as it wants. fn worker(self: *Self, buffer: []u8) void { - // Having a re-usable buffer per thread is the most efficient way - // we can do any dynamic allocations. We'll pair this later with - // a FallbackAllocator. The main issue is that some data must outlive - // the worker thread (in nonblocking mode), but this isn't something - // we need to worry about here. As far as this worker thread is - // concerned, it has a chunk of memory (buffer) which it'll pass - // to the callback function to do with as it wants. + const queue = self.queue; + const queue_end = queue.len - 1; while (true) { self.mutex.lock(); - while (self.pending == 0) { + while (self.tail == self.head) { if (self.stopped) { self.mutex.unlock(); return; } - self.pull_cond.wait(&self.mutex); + self.read_cond.wait(&self.mutex); } - const pull = self.pull; - const args = self.queue[pull]; - self.pull = if (pull == self.queue_end) 0 else pull + 1; - self.pending -= 1; + const tail = self.tail; + const args = queue[tail]; + self.tail = if (tail == queue_end) 0 else tail + 1; self.mutex.unlock(); - self.push_cond.signal(); + self.write_cond.signal(); // convert Args to FullArgs, i.e. inject buffer as the last argument var full_args: FullArgs = undefined;