Skip to content

Commit

Permalink
simplify threadpool by removing the pending field and using the exist…
Browse files Browse the repository at this point in the history
…ing head/tail to figure out the state
  • Loading branch information
karlseguin committed Sep 4, 2024
1 parent c10334c commit 906be7e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ tb:

.PHONY: s
s:
zig build run -freference-trace
zig build example_1 -freference-trace

.phony: d
d:
Expand Down
89 changes: 50 additions & 39 deletions src/thread_pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ pub fn ThreadPool(comptime F: anytype) type {
});

return struct {
push: usize,
pull: usize,
stopped: bool,
pending: usize,
// position in queue to read from
tail: usize,

// position in the queue to write to
head: usize,

// pendind jobs
queue: []Args,

stopped: bool,
threads: []Thread,
mutex: Thread.Mutex,
pull_cond: Thread.Condition,
push_cond: Thread.Condition,
queue_end: usize,
read_cond: Thread.Condition,
write_cond: Thread.Condition,

const Self = @This();

Expand All @@ -60,29 +64,26 @@ pub fn ThreadPool(comptime F: anytype) type {
const thread_pool = try allocator.create(Self);

thread_pool.* = .{
.pull = 0,
.push = 0,
.pending = 0,
.tail = 0,
.head = 0,
.mutex = .{},
.stopped = false,
.queue = queue,
.pull_cond = .{},
.push_cond = .{},
.read_cond = .{},
.write_cond = .{},
.threads = threads,
.queue_end = queue.len - 1,
};

var started: usize = 0;
errdefer {
thread_pool.stopped = true;
thread_pool.pull_cond.broadcast();
thread_pool.read_cond.broadcast();
for (0..started) |i| {
threads[i].join();
}
}

for (0..threads.len) |i| {
// This becomes owned by the thread, it'll free it as it ends
const buffer = try allocator.alloc(u8, opts.buffer_size);
threads[i] = try Thread.spawn(.{}, Self.worker, .{ thread_pool, buffer });
started += 1;
Expand All @@ -96,7 +97,7 @@ pub fn ThreadPool(comptime F: anytype) type {
self.stopped = true;
self.mutex.unlock();

self.pull_cond.broadcast();
self.read_cond.broadcast();
for (self.threads) |thrd| {
thrd.join();
}
Expand All @@ -105,51 +106,61 @@ pub fn ThreadPool(comptime F: anytype) type {
pub fn empty(self: *Self) bool {
self.mutex.lock();
defer self.mutex.unlock();
return self.pull == self.push;
return self.head == self.tail;
}

pub fn spawn(self: *Self, args: Args) void {
const queue = self.queue;
const len = queue.len;
const queue_end = queue.len - 1;

self.mutex.lock();
while (self.pending == len) {
self.push_cond.wait(&self.mutex);
while (self.isFull(queue_end)) {
self.write_cond.wait(&self.mutex);
}

const push = self.push;
self.queue[push] = args;
self.push = if (push == self.queue_end) 0 else push + 1;
self.pending += 1;
const head = self.head;
queue[head] = args;
self.head = if (head == queue_end) 0 else head + 1;
self.mutex.unlock();

self.pull_cond.signal();
self.read_cond.signal();
}

// assumed to be called under lock
inline fn isFull(self: *Self, queue_end: usize) bool {
const tail = self.tail;
const head = self.head;
if (tail == 0) {
return head == queue_end;
}
return head == tail - 1;
}

// Having a re-usable buffer per thread is the most efficient way
// we can do any dynamic allocations. We'll pair this later with
// a FallbackAllocator. The main issue is that some data must outlive
// the worker thread (in nonblocking mode), but this isn't something
// we need to worry about here. As far as this worker thread is
// concerned, it has a chunk of memory (buffer) which it'll pass
// to the callback function to do with as it wants.
fn worker(self: *Self, buffer: []u8) void {
// Having a re-usable buffer per thread is the most efficient way
// we can do any dynamic allocations. We'll pair this later with
// a FallbackAllocator. The main issue is that some data must outlive
// the worker thread (in nonblocking mode), but this isn't something
// we need to worry about here. As far as this worker thread is
// concerned, it has a chunk of memory (buffer) which it'll pass
// to the callback function to do with as it wants.
const queue = self.queue;
const queue_end = queue.len - 1;

while (true) {
self.mutex.lock();
while (self.pending == 0) {
while (self.tail == self.head) {
if (self.stopped) {
self.mutex.unlock();
return;
}
self.pull_cond.wait(&self.mutex);
self.read_cond.wait(&self.mutex);
}
const pull = self.pull;
const args = self.queue[pull];
self.pull = if (pull == self.queue_end) 0 else pull + 1;
self.pending -= 1;
const tail = self.tail;
const args = queue[tail];
self.tail = if (tail == queue_end) 0 else tail + 1;
self.mutex.unlock();
self.push_cond.signal();
self.write_cond.signal();

// convert Args to FullArgs, i.e. inject buffer as the last argument
var full_args: FullArgs = undefined;
Expand Down

0 comments on commit 906be7e

Please sign in to comment.