Skip to content

Commit

Permalink
replay-verify: improve range allocation
Browse files Browse the repository at this point in the history
1. make range end_version inclusive.
2. replay-on-archive returns gracefully if range and db range doesn't
   overlap. (because the scheduler doesn't know actual db range)
  • Loading branch information
msmouse committed Mar 5, 2025
1 parent f63cef2 commit a48ffac
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 46 deletions.
54 changes: 31 additions & 23 deletions storage/db-tool/src/replay_on_archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aptos_config::config::{
NO_OP_STORAGE_PRUNER_CONFIG,
};
use aptos_db::{backup::backup_handler::BackupHandler, AptosDB};
use aptos_logger::{error, info};
use aptos_logger::{error, info, prelude::*};
use aptos_storage_interface::{
state_store::state_view::db_state_view::DbStateViewAtVersion, AptosDbError, DbReader,
};
Expand Down Expand Up @@ -151,7 +151,7 @@ impl Verifier {
Self::get_start_and_limit(&arc_db, config.start_version, config.end_version)?;
info!(
start_version = start,
end_version = start + limit,
limit = limit,
"Replaying transactions."
);
Ok(Self {
Expand All @@ -169,6 +169,11 @@ impl Verifier {

// Split the replay to multiple reply tasks running in parallel
pub fn run(self) -> Result<Vec<Error>> {
if self.limit == 0 {
info!("Nothing to verify.");
return Ok(vec![]);
}

AptosVM::set_concurrency_level_once(self.replay_concurrency_level);
let task_size = self.limit / self.concurrent_replay as u64;
let ranges: Vec<(u64, u64)> = (0..self.concurrent_replay)
Expand Down Expand Up @@ -260,28 +265,31 @@ impl Verifier {
start_version: Version,
end_version: Version,
) -> Result<(Version, u64)> {
let start_version = std::cmp::max(
aptos_db
.get_first_txn_version()?
.ok_or(AptosDbError::NotFound(
"First txn version is None".to_string(),
))?,
start_version,
);
let db_start = aptos_db
.get_first_txn_version()?
.ok_or(AptosDbError::NotFound(
"First txn version is None".to_string(),
))?;
let start = std::cmp::max(db_start, start_version);

let db_end = aptos_db
.get_synced_version()?
.ok_or(AptosDbError::NotFound("Synced version is None".to_string()))?;
let end = std::cmp::min(end_version, db_end);

let limit = if start <= end {
end - start + 1
} else {
warn!(
start = start_version,
db_start = db_start,
end = end_version,
db_end = db_end,
"No transactions to verify in requested range."
);
0
};

let end_version = std::cmp::min(
aptos_db
.get_synced_version()?
.ok_or(AptosDbError::NotFound("Synced version is None".to_string()))?,
end_version,
);
assert!(
start_version <= end_version,
"start_version {} must be less than or equal to end_version{}",
start_version,
end_version
);
let limit = end_version - start_version;
Ok((start_version, limit))
}

Expand Down
60 changes: 37 additions & 23 deletions testsuite/replay-verify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from kubernetes import client, config as KubernetesConfig
from kubernetes.client.rest import ApiException
from google.cloud import storage
import datetime
import time
import logging
import os
Expand Down Expand Up @@ -30,6 +31,7 @@

REPLAY_CONCURRENCY_LEVEL = 1

INT64_MAX = 9_223_372_036_854_775_807

class Network(Enum):
TESTNET = 1
Expand Down Expand Up @@ -363,34 +365,46 @@ def __str__(self):
def get_label(self):
return f"{self.id}-{self.network}"

def sorted_ranges_to_skip(self):
if len(self.ranges_to_skip) == 0:
return []

sorted_skips = [list(r) for r in sorted(self.ranges_to_skip) if r[1] >= self.start_version]

# merge skip ranges
ret = []
current_skip = sorted_skips.pop(0)
for next_skip in sorted_skips:
if next_skip[0] > current_skip[1] + 1:
ret.append(current_skip)
current_skip = next_skip
else:
current_skip[1] = max(current_skip[1], next_skip[1])
ret.append(current_skip)

return sorted_skips

def create_tasks(self) -> None:
current = self.start_version

sorted_skips = [
r
for r in sorted(self.ranges_to_skip, key=lambda x: x[0])
if r[0] > self.start_version
]
skips = self.sorted_ranges_to_skip()

while current < self.end_version:
while sorted_skips and sorted_skips[0][0] <= current < sorted_skips[0][1]:
current = sorted_skips[0][1] + 1
sorted_skips.pop(0)
while current <= self.end_version:
(skip_start, skip_end) = (INT64_MAX, INT64_MAX) if len(skips) == 0 else skips[0]
if skip_start <= current:
skips.pop(0)
current = skip_end + 1
continue

range_end = min(
(
current + self.range_size,
self.end_version,
sorted_skips[0][0] if sorted_skips else self.end_version,
)
)
next_current = min(current + self.range_size, self.end_version + 1, skip_start)

# avoid having too many small tasks, simply skip the task
if next_current - current >= self.config.min_range_size:
self.tasks.append((current, next_current - 1))

current = next_current

if current < range_end:
# avoid having too many small tasks, simply skip the task
if range_end - current >= self.config.min_range_size:
self.tasks.append((current, range_end))
current = range_end
logger.info(self.tasks)
logger.info(f"Task ranges: {self.tasks}")

def create_pvc_from_snapshot(self):
snapshot_name = (
Expand Down Expand Up @@ -619,7 +633,7 @@ def print_logs(failed_workpod_logs: list[str], txn_mismatch_logs: list[str]) ->
get_kubectl_credentials("aptos-devinfra-0", "us-central1", "devinfra-usce1-0")
(start, end, skip_ranges) = read_skip_ranges(args.network)
image = get_image(args.image_tag) if args.image_tag is not None else get_image()
run_id = image[-5:]
run_id = f"{datetime.datetime.now().strftime('%Y%m%d-%H%M%S-')}-{image[-5:]}"
network = Network.from_string(args.network)
config = ReplayConfig(network)
worker_cnt = args.worker_cnt if args.worker_cnt else config.pvc_number * 7
Expand Down

0 comments on commit a48ffac

Please sign in to comment.