Skip to content

Commit

Permalink
fix(adb/threadpool): remove waitgroups and fix segfault
Browse files Browse the repository at this point in the history
  • Loading branch information
0xNineteen committed Jan 30, 2025
1 parent 8345545 commit 34f99cb
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 111 deletions.
98 changes: 29 additions & 69 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,14 @@ pub const AccountsDB = struct {
try self.fastload(fastload_dir, collapsed_manifest.accounts_db_fields);
self.logger.info().logf("fastload: total time: {s}", .{timer.read()});
} else {
const load_duration = try self.loadFromSnapshot(
var load_timer = try sig.time.Timer.start();
try self.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
n_threads,
allocator,
accounts_per_file_estimate,
);
self.logger.info().logf("loadFromSnapshot: total time: {s}", .{load_duration});
self.logger.info().logf("loadFromSnapshot: total time: {s}", .{load_timer.read()});
}

// no need to re-save if we just loaded from a fastload
Expand Down Expand Up @@ -404,7 +405,7 @@ pub const AccountsDB = struct {
/// needs to be a thread-safe allocator
per_thread_allocator: std.mem.Allocator,
accounts_per_file_estimate: u64,
) !sig.time.Duration {
) !void {
self.logger.info().log("running loadFromSnapshot...");

// used to read account files
Expand All @@ -429,40 +430,18 @@ pub const AccountsDB = struct {
bhs.accumulate(snapshot_manifest.bank_hash_info.stats);
}

var timer = try sig.time.Timer.start();
// short path
if (n_threads == 1) {
try self.loadAndVerifyAccountsFiles(
accounts_dir,
accounts_per_file_estimate,
snapshot_manifest.file_map,
0,
n_account_files,
true,
);

// if geyser, send end of data signal
if (self.geyser_writer) |geyser_writer| {
const end_of_snapshot: sig.geyser.core.VersionedAccountPayload = .EndOfSnapshotLoading;
try geyser_writer.writePayloadToPipe(end_of_snapshot);
}

return timer.read();
}

// setup the parallel indexing
const loading_threads = try self.allocator.alloc(AccountsDB, n_parse_threads);
defer self.allocator.free(loading_threads);

try initLoadingThreads(per_thread_allocator, loading_threads, self);
defer deinitLoadingThreads(per_thread_allocator, loading_threads);

self.logger.info().logf("[{d} threads]: reading and indexing accounts...", .{n_parse_threads});
{
var wg: std.Thread.WaitGroup = .{};
defer wg.wait();
try spawnThreadTasks(loadAndVerifyAccountsFilesMultiThread, .{
.wg = &wg,
self.logger.info().logf("[{d} threads]: running loadAndVerifyAccountsFiles...", .{n_parse_threads});
try spawnThreadTasks(
self.allocator,
loadAndVerifyAccountsFilesMultiThread,
.{
.data_len = n_account_files,
.max_threads = n_parse_threads,
.params = .{
Expand All @@ -471,8 +450,8 @@ pub const AccountsDB = struct {
snapshot_manifest.file_map,
accounts_per_file_estimate,
},
});
}
},
);

// if geyser, send end of data signal
if (self.geyser_writer) |geyser_writer| {
Expand All @@ -483,9 +462,6 @@ pub const AccountsDB = struct {
var merge_timer = try sig.time.Timer.start();
try self.mergeMultipleDBs(loading_threads, n_combine_threads);
self.logger.debug().logf("mergeMultipleDBs: total time: {}", .{merge_timer.read()});

self.logger.debug().logf("loadFromSnapshot: total time: {s}", .{timer.read()});
return timer.read();
}

/// Initializes a slice of children `AccountsDB`s, used to divide the work of loading from a snapshot.
Expand Down Expand Up @@ -777,10 +753,7 @@ pub const AccountsDB = struct {
) !void {
self.logger.info().logf("[{d} threads]: running mergeMultipleDBs...", .{n_threads});

var merge_indexes_wg: std.Thread.WaitGroup = .{};
defer merge_indexes_wg.wait();
try spawnThreadTasks(mergeThreadIndexesMultiThread, .{
.wg = &merge_indexes_wg,
try spawnThreadTasks(self.allocator, mergeThreadIndexesMultiThread, .{
.data_len = self.account_index.pubkey_ref_map.numberOfShards(),
.max_threads = n_threads,
.params = .{
Expand Down Expand Up @@ -954,35 +927,20 @@ pub const AccountsDB = struct {

// split processing the bins over muliple threads
self.logger.info().logf(
"collecting hashes from accounts using {} threads...",
"[{} threads] collecting hashes from accounts",
.{n_threads},
);
if (n_threads == 1) {
try getHashesFromIndex(
try spawnThreadTasks(self.allocator, getHashesFromIndexMultiThread, .{
.data_len = self.account_index.pubkey_ref_map.numberOfShards(),
.max_threads = n_threads,
.params = .{
self,
config,
self.account_index.pubkey_ref_map.shards,
self.allocator,
&hashes[0],
&lamports[0],
true,
);
} else {
var wg: std.Thread.WaitGroup = .{};
defer wg.wait();
try spawnThreadTasks(getHashesFromIndexMultiThread, .{
.wg = &wg,
.data_len = self.account_index.pubkey_ref_map.numberOfShards(),
.max_threads = n_threads,
.params = .{
self,
config,
self.allocator,
hashes,
lamports,
},
});
}
hashes,
lamports,
},
});
self.logger.debug().logf("collecting hashes from accounts took: {s}", .{timer.read()});
timer.reset();

Expand Down Expand Up @@ -3215,7 +3173,7 @@ pub fn getAccountPerFileEstimateFromCluster(
cluster: sig.core.Cluster,
) error{NotImplementedYet}!u64 {
return switch (cluster) {
.testnet => 1_000,
.testnet => 500,
else => error.NotImplementedYet,
};
}
Expand Down Expand Up @@ -3267,7 +3225,7 @@ fn testWriteSnapshotFull(
var snap_fields = try SnapshotManifest.decodeFromBincode(allocator, manifest_file.reader());
defer snap_fields.deinit(allocator);

_ = try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 1_500);
try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 500);

const snapshot_gen_info = try accounts_db.generateFullSnapshot(.{
.target_slot = slot,
Expand Down Expand Up @@ -3306,7 +3264,7 @@ fn testWriteSnapshotIncremental(
var snap_fields = try SnapshotManifest.decodeFromBincode(allocator, manifest_file.reader());
defer snap_fields.deinit(allocator);

_ = try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 1_500);
try accounts_db.loadFromSnapshot(snap_fields.accounts_db_fields, 1, allocator, 500);

const snapshot_gen_info = try accounts_db.generateIncrementalSnapshot(.{
.target_slot = slot,
Expand Down Expand Up @@ -3474,7 +3432,7 @@ fn loadTestAccountsDB(
});
errdefer accounts_db.deinit();

_ = try accounts_db.loadFromSnapshot(
try accounts_db.loadFromSnapshot(
manifest.accounts_db_fields,
n_threads,
allocator,
Expand Down Expand Up @@ -3543,7 +3501,7 @@ test "geyser stream on load" {
});
defer accounts_db.deinit();

_ = try accounts_db.loadFromSnapshot(
try accounts_db.loadFromSnapshot(
snapshot.accounts_db_fields,
1,
allocator,
Expand Down Expand Up @@ -4487,12 +4445,14 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
});
defer accounts_db.deinit();

const loading_duration = try accounts_db.loadFromSnapshot(
var load_timer = try sig.time.Timer.start();
try accounts_db.loadFromSnapshot(
collapsed_manifest.accounts_db_fields,
bench_args.n_threads,
allocator,
try getAccountPerFileEstimateFromCluster(bench_args.cluster),
);
const loading_duration = load_timer.read();

const fastload_save_duration = blk: {
var timer = try sig.time.Timer.start();
Expand Down
1 change: 0 additions & 1 deletion src/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,6 @@ fn loadSnapshot(
break :blk cli_n_threads_snapshot_load;
}
};
logger.info().logf("n_threads_snapshot_load: {d}", .{n_threads_snapshot_load});

var accounts_db = try AccountsDB.init(.{
.allocator = allocator,
Expand Down
5 changes: 3 additions & 2 deletions src/shred_network/repair_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ pub const RepairService = struct {
peer_provider: RepairPeerProvider,
shred_tracker: *BasicShredTracker,
) !Self {
const n_threads = maxRequesterThreads();
return RepairService{
.allocator = allocator,
.requester = requester,
Expand All @@ -110,7 +111,7 @@ pub const RepairService = struct {
.logger = logger.withScope(@typeName(Self)),
.exit = exit,
.report = MultiSlotReport.init(allocator),
.thread_pool = RequestBatchThreadPool.init(allocator, maxRequesterThreads()),
.thread_pool = try RequestBatchThreadPool.init(allocator, n_threads, n_threads),
.metrics = try registry.initStruct(Metrics),
.prng = std.Random.DefaultPrng.init(0),
};
Expand Down Expand Up @@ -170,7 +171,7 @@ pub const RepairService = struct {
for (0..num_threads) |i| {
const start = (addressed_requests.items.len * i) / num_threads;
const end = (addressed_requests.items.len * (i + 1)) / num_threads;
try self.thread_pool.schedule(.{
self.thread_pool.schedule(.{
.requester = &self.requester,
.requests = addressed_requests.items[start..end],
});
Expand Down
Loading

0 comments on commit 34f99cb

Please sign in to comment.