Skip to content

Commit

Permalink
replay-verify: improve range allocation
Browse files Browse the repository at this point in the history
1. make range end_version inclusive.
2. replay-on-archive returns gracefully if range and db range doesn't
   overlap. (because the scheduler doesn't know actual db range)
  • Loading branch information
msmouse committed Mar 5, 2025
1 parent 732336d commit 317417e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 45 deletions.
54 changes: 31 additions & 23 deletions storage/db-tool/src/replay_on_archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_config::config::{
NO_OP_STORAGE_PRUNER_CONFIG,
};
use aptos_db::{backup::backup_handler::BackupHandler, AptosDB};
use aptos_logger::{error, info};
use aptos_logger::{error, info, prelude::*};
use aptos_storage_interface::{
state_store::state_view::db_state_view::DbStateViewAtVersion, AptosDbError, DbReader,
};
Expand Down Expand Up @@ -151,7 +151,7 @@ impl Verifier {
Self::get_start_and_limit(&arc_db, config.start_version, config.end_version)?;
info!(
start_version = start,
end_version = start + limit,
limit = limit,
"Replaying transactions."
);
Ok(Self {
Expand All @@ -169,6 +169,11 @@ impl Verifier {

// Split the replay to multiple reply tasks running in parallel
pub fn run(self) -> Result<Vec<Error>> {
if self.limit == 0 {
info!("Nothing to verify.");
return Ok(vec![]);
}

AptosVM::set_concurrency_level_once(self.replay_concurrency_level);
let task_size = self.limit / self.concurrent_replay as u64;
let ranges: Vec<(u64, u64)> = (0..self.concurrent_replay)
Expand Down Expand Up @@ -260,28 +265,31 @@ impl Verifier {
start_version: Version,
end_version: Version,
) -> Result<(Version, u64)> {
let start_version = std::cmp::max(
aptos_db
.get_first_txn_version()?
.ok_or(AptosDbError::NotFound(
"First txn version is None".to_string(),
))?,
start_version,
);
let db_start = aptos_db
.get_first_txn_version()?
.ok_or(AptosDbError::NotFound(
"First txn version is None".to_string(),
))?;
let start = std::cmp::max(db_start, start_version);

let db_end = aptos_db
.get_synced_version()?
.ok_or(AptosDbError::NotFound("Synced version is None".to_string()))?;
let end = std::cmp::min(end_version, db_end);

let limit = if start <= end {
end - start + 1
} else {
warn!(
start = start_version,
db_start = db_start,
end = end_version,
db_end = db_end,
"No transactions to verify in requested range."
);
0
};

let end_version = std::cmp::min(
aptos_db
.get_synced_version()?
.ok_or(AptosDbError::NotFound("Synced version is None".to_string()))?,
end_version,
);
assert!(
start_version <= end_version,
"start_version {} must be less than or equal to end_version{}",
start_version,
end_version
);
let limit = end_version - start_version;
Ok((start_version, limit))
}

Expand Down
51 changes: 29 additions & 22 deletions testsuite/replay-verify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

REPLAY_CONCURRENCY_LEVEL = 1

INT64_MAX = 9_223_372_036_854_775_807

class Network(Enum):
TESTNET = 1
Expand Down Expand Up @@ -363,34 +364,40 @@ def __str__(self):
def get_label(self):
return f"{self.id}-{self.network}"

def sorted_ranges_to_skip(self):
if len(self.ranges_to_skip) == 0:
return []

sorted_skips = [r for r in sorted(self.ranges_to_skip) if r[1] >= self.start_version]

for i in range(0, len(sorted_skips) - 1):
assert(
sorted_skips[i][0] <= sorted_skips[i][1] < sorted_skips[i + 1][0]
), f"overlapping skip ranges: {sorted_skips[i]} and {sorted_skips[i + 1]}"

return sorted_skips

def create_tasks(self) -> None:
current = self.start_version

sorted_skips = [
r
for r in sorted(self.ranges_to_skip, key=lambda x: x[0])
if r[0] > self.start_version
]
skips = self.sorted_ranges_to_skip()

while current < self.end_version:
while sorted_skips and sorted_skips[0][0] <= current < sorted_skips[0][1]:
current = sorted_skips[0][1] + 1
sorted_skips.pop(0)
while current <= self.end_version:
(skip_start, skip_end) = (INT64_MAX, INT64_MAX) if len(skips) == 0 else skips[0]
if skip_start <= current:
skips.pop(0)
current = skip_end + 1
continue

range_end = min(
(
current + self.range_size,
self.end_version,
sorted_skips[0][0] if sorted_skips else self.end_version,
)
)
next_current = min(current + self.range_size, self.end_version + 1, skip_start)

# avoid having too many small tasks, simply skip the task
if next_current - current >= self.config.min_range_size:
self.tasks.append((current, next_current - 1))

current = next_current

if current < range_end:
# avoid having too many small tasks, simply skip the task
if range_end - current >= self.config.min_range_size:
self.tasks.append((current, range_end))
current = range_end
logger.info(self.tasks)
logger.info(f"Task ranges: {self.tasks}")

def create_pvc_from_snapshot(self):
snapshot_name = (
Expand Down

0 comments on commit 317417e

Please sign in to comment.