Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(adb/threadpool): remove waitgroups and fix segfault #522

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 29 additions & 69 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,14 @@ pub const AccountsDB = struct {
try self.fastload(fastload_dir, collapsed_manifest.accounts_db_fields);
self.logger.info().logf("fastload: total time: {s}", .{timer.read()});
} else {
const load_duration = try self.loadFromSnapshot(
var load_timer = try sig.time.Timer.start();
try self.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
n_threads,
allocator,
accounts_per_file_estimate,
);
self.logger.info().logf("loadFromSnapshot: total time: {s}", .{load_duration});
self.logger.info().logf("loadFromSnapshot: total time: {s}", .{load_timer.read()});
}

// no need to re-save if we just loaded from a fastload
Expand Down Expand Up @@ -404,7 +405,7 @@ pub const AccountsDB = struct {
/// needs to be a thread-safe allocator
per_thread_allocator: std.mem.Allocator,
accounts_per_file_estimate: u64,
) !sig.time.Duration {
) !void {
self.logger.info().log("running loadFromSnapshot...");

// used to read account files
Expand All @@ -429,40 +430,18 @@ pub const AccountsDB = struct {
bhs.accumulate(snapshot_manifest.bank_hash_info.stats);
}

var timer = try sig.time.Timer.start();
// short path
if (n_threads == 1) {
try self.loadAndVerifyAccountsFiles(
accounts_dir,
accounts_per_file_estimate,
snapshot_manifest.file_map,
0,
n_account_files,
true,
);

// if geyser, send end of data signal
if (self.geyser_writer) |geyser_writer| {
const end_of_snapshot: sig.geyser.core.VersionedAccountPayload = .EndOfSnapshotLoading;
try geyser_writer.writePayloadToPipe(end_of_snapshot);
}

return timer.read();
}

// setup the parallel indexing
const loading_threads = try self.allocator.alloc(AccountsDB, n_parse_threads);
defer self.allocator.free(loading_threads);

try initLoadingThreads(per_thread_allocator, loading_threads, self);
defer deinitLoadingThreads(per_thread_allocator, loading_threads);

self.logger.info().logf("[{d} threads]: reading and indexing accounts...", .{n_parse_threads});
{
var wg: std.Thread.WaitGroup = .{};
defer wg.wait();
try spawnThreadTasks(loadAndVerifyAccountsFilesMultiThread, .{
.wg = &wg,
self.logger.info().logf("[{d} threads]: running loadAndVerifyAccountsFiles...", .{n_parse_threads});
try spawnThreadTasks(
self.allocator,
loadAndVerifyAccountsFilesMultiThread,
.{
.data_len = n_account_files,
.max_threads = n_parse_threads,
.params = .{
Expand All @@ -471,8 +450,8 @@ pub const AccountsDB = struct {
snapshot_manifest.file_map,
accounts_per_file_estimate,
},
});
}
},
);

// if geyser, send end of data signal
if (self.geyser_writer) |geyser_writer| {
Expand All @@ -483,9 +462,6 @@ pub const AccountsDB = struct {
var merge_timer = try sig.time.Timer.start();
try self.mergeMultipleDBs(loading_threads, n_combine_threads);
self.logger.debug().logf("mergeMultipleDBs: total time: {}", .{merge_timer.read()});

self.logger.debug().logf("loadFromSnapshot: total time: {s}", .{timer.read()});
return timer.read();
}

/// Initializes a slice of children `AccountsDB`s, used to divide the work of loading from a snapshot.
Expand Down Expand Up @@ -777,10 +753,7 @@ pub const AccountsDB = struct {
) !void {
self.logger.info().logf("[{d} threads]: running mergeMultipleDBs...", .{n_threads});

var merge_indexes_wg: std.Thread.WaitGroup = .{};
defer merge_indexes_wg.wait();
try spawnThreadTasks(mergeThreadIndexesMultiThread, .{
.wg = &merge_indexes_wg,
try spawnThreadTasks(self.allocator, mergeThreadIndexesMultiThread, .{
.data_len = self.account_index.pubkey_ref_map.numberOfShards(),
.max_threads = n_threads,
.params = .{
Expand Down Expand Up @@ -954,35 +927,20 @@ pub const AccountsDB = struct {

// split processing the bins over muliple threads
self.logger.info().logf(
"collecting hashes from accounts using {} threads...",
"[{} threads] collecting hashes from accounts",
.{n_threads},
);
if (n_threads == 1) {
try getHashesFromIndex(
try spawnThreadTasks(self.allocator, getHashesFromIndexMultiThread, .{
.data_len = self.account_index.pubkey_ref_map.numberOfShards(),
.max_threads = n_threads,
.params = .{
self,
config,
self.account_index.pubkey_ref_map.shards,
self.allocator,
&hashes[0],
&lamports[0],
true,
);
} else {
var wg: std.Thread.WaitGroup = .{};
defer wg.wait();
try spawnThreadTasks(getHashesFromIndexMultiThread, .{
.wg = &wg,
.data_len = self.account_index.pubkey_ref_map.numberOfShards(),
.max_threads = n_threads,
.params = .{
self,
config,
self.allocator,
hashes,
lamports,
},
});
}
hashes,
lamports,
},
});
self.logger.debug().logf("collecting hashes from accounts took: {s}", .{timer.read()});
timer.reset();

Expand Down Expand Up @@ -3215,7 +3173,7 @@ pub fn getAccountPerFileEstimateFromCluster(
cluster: sig.core.Cluster,
) error{NotImplementedYet}!u64 {
return switch (cluster) {
.testnet => 1_000,
.testnet => 500,
else => error.NotImplementedYet,
};
}
Expand Down Expand Up @@ -3267,7 +3225,7 @@ fn testWriteSnapshotFull(
var snap_fields = try SnapshotManifest.decodeFromBincode(allocator, manifest_file.reader());
defer snap_fields.deinit(allocator);

_ = try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 1_500);
try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 500);

const snapshot_gen_info = try accounts_db.generateFullSnapshot(.{
.target_slot = slot,
Expand Down Expand Up @@ -3306,7 +3264,7 @@ fn testWriteSnapshotIncremental(
var snap_fields = try SnapshotManifest.decodeFromBincode(allocator, manifest_file.reader());
defer snap_fields.deinit(allocator);

_ = try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 1_500);
try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 500);

const snapshot_gen_info = try accounts_db.generateIncrementalSnapshot(.{
.target_slot = slot,
Expand Down Expand Up @@ -3474,7 +3432,7 @@ fn loadTestAccountsDB(
});
errdefer accounts_db.deinit();

_ = try accounts_db.loadFromSnapshot(
try accounts_db.loadFromSnapshot(
manifest.accounts_db_fields,
n_threads,
allocator,
Expand Down Expand Up @@ -3543,7 +3501,7 @@ test "geyser stream on load" {
});
defer accounts_db.deinit();

_ = try accounts_db.loadFromSnapshot(
try accounts_db.loadFromSnapshot(
snapshot.accounts_db_fields,
1,
allocator,
Expand Down Expand Up @@ -4487,12 +4445,14 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
});
defer accounts_db.deinit();

const loading_duration = try accounts_db.loadFromSnapshot(
var load_timer = try sig.time.Timer.start();
try accounts_db.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
bench_args.n_threads,
allocator,
try getAccountPerFileEstimateFromCluster(bench_args.cluster),
);
const loading_duration = load_timer.read();

const fastload_save_duration = blk: {
var timer = try sig.time.Timer.start();
Expand Down
1 change: 0 additions & 1 deletion src/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,6 @@ fn loadSnapshot(
break :blk cli_n_threads_snapshot_load;
}
};
logger.info().logf("n_threads_snapshot_load: {d}", .{n_threads_snapshot_load});

var accounts_db = try AccountsDB.init(.{
.allocator = allocator,
Expand Down
7 changes: 3 additions & 4 deletions src/prometheus/registry.zig
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,12 @@ pub fn Registry(comptime options: RegistryOptions) type {
if (self.nbMetrics() >= options.max_metrics) return error.TooManyMetrics;
if (name.len > options.max_name_len) return error.NameTooLong;

var allocator = self.arena_state.allocator();

const duped_name = try allocator.dupe(u8, name);

self.mutex.lock();
defer self.mutex.unlock();

const allocator = self.arena_state.allocator();
const duped_name = try allocator.dupe(u8, name);

const gop = try self.metrics.getOrPut(allocator, duped_name);
if (!gop.found_existing) {
var real_metric = try allocator.create(MetricType);
Expand Down
7 changes: 4 additions & 3 deletions src/shred_network/repair_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub const RepairService = struct {
peer_provider: RepairPeerProvider,
shred_tracker: *BasicShredTracker,
) !Self {
const n_threads = maxRequesterThreads();
return RepairService{
.allocator = allocator,
.requester = requester,
Expand All @@ -110,7 +111,7 @@ pub const RepairService = struct {
.logger = logger.withScope(@typeName(Self)),
.exit = exit,
.report = MultiSlotReport.init(allocator),
.thread_pool = RequestBatchThreadPool.init(allocator, maxRequesterThreads()),
.thread_pool = try RequestBatchThreadPool.init(allocator, n_threads, n_threads),
.metrics = try registry.initStruct(Metrics),
.prng = std.Random.DefaultPrng.init(0),
};
Expand Down Expand Up @@ -170,12 +171,12 @@ pub const RepairService = struct {
for (0..num_threads) |i| {
const start = (addressed_requests.items.len * i) / num_threads;
const end = (addressed_requests.items.len * (i + 1)) / num_threads;
try self.thread_pool.schedule(.{
self.thread_pool.schedule(.{
.requester = &self.requester,
.requests = addressed_requests.items[start..end],
});
try self.thread_pool.joinFallible();
0xNineteen marked this conversation as resolved.
Show resolved Hide resolved
}
try self.thread_pool.joinFallible();
}

return addressed_requests.items.len;
Expand Down
Loading
Loading