Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIBD_IMPL] PIBD Stats + Retry on validation errors #3694

Merged
merged 4 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/handlers/chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ChainResetHandler {
pub fn reset_chain_head(&self, hash: Hash) -> Result<(), Error> {
let chain = w(&self.chain)?;
let header = chain.get_block_header(&hash)?;
chain.reset_chain_head(&header)?;
chain.reset_chain_head(&header, true)?;

// Reset the sync status and clear out any sync error.
w(&self.sync_state)?.reset();
Expand Down
48 changes: 40 additions & 8 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ impl Chain {
/// Reset both head and header_head to the provided header.
/// Handles simple rewind and more complex fork scenarios.
/// Used by the reset_chain_head owner api endpoint.
pub fn reset_chain_head<T: Into<Tip>>(&self, head: T) -> Result<(), Error> {
pub fn reset_chain_head<T: Into<Tip>>(
&self,
head: T,
rewind_headers: bool,
) -> Result<(), Error> {
let head = head.into();

let mut header_pmmr = self.header_pmmr.write();
Expand All @@ -248,19 +252,42 @@ impl Chain {
},
)?;

// If the rewind of full blocks was successful then we can rewind the header MMR.
// Rewind and reapply headers to reset the header MMR.
txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
batch.save_header_head(&head)?;
Ok(())
})?;
if rewind_headers {
// If the rewind of full blocks was successful then we can rewind the header MMR.
// Rewind and reapply headers to reset the header MMR.
txhashset::header_extending(&mut header_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
batch.save_header_head(&head)?;
Ok(())
})?;
}

batch.commit()?;

Ok(())
}

/// Reset prune lists (when PIBD resets)
pub fn reset_prune_lists(&self) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let mut batch = self.store.batch()?;

txhashset::extending(&mut header_pmmr, &mut txhashset, &mut batch, |ext, _| {
let extension = &mut ext.extension;
extension.reset_prune_lists();
Ok(())
})?;
Ok(())
}

/// Reset PIBD head
pub fn reset_pibd_head(&self) -> Result<(), Error> {
let batch = self.store.batch()?;
batch.save_pibd_head(&self.genesis().into())?;
Ok(())
}

/// Are we running with archive_mode enabled?
pub fn archive_mode(&self) -> bool {
self.archive_mode
Expand All @@ -276,6 +303,11 @@ impl Chain {
self.txhashset.clone()
}

/// return genesis header
pub fn genesis(&self) -> BlockHeader {
self.genesis.clone()
}

/// Shared store instance.
pub fn store(&self) -> Arc<store::ChainStore> {
self.store.clone()
Expand Down
31 changes: 22 additions & 9 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,20 @@ impl Desegmenter {
retval
}

/// Reset all state
pub fn reset(&mut self) {
self.all_segments_complete = false;
self.bitmap_segment_cache = vec![];
self.output_segment_cache = vec![];
self.rangeproof_segment_cache = vec![];
self.kernel_segment_cache = vec![];
self.bitmap_mmr_leaf_count = 0;
self.bitmap_mmr_size = 0;
self.bitmap_cache = None;
self.bitmap_accumulator = BitmapAccumulator::new();
self.calc_bitmap_mmr_sizes();
}

/// Return reference to the header used for validation
pub fn header(&self) -> &BlockHeader {
&self.archive_header
Expand Down Expand Up @@ -225,6 +239,12 @@ impl Desegmenter {
let batch = store.batch().unwrap();
batch.save_pibd_head(&tip).unwrap();
batch.commit().unwrap();
status.update_pibd_progress(
false,
false,
latest_block_height,
header_head.height,
);
if h == header_head {
// get out of this loop and move on to validation
break;
Expand All @@ -240,10 +260,10 @@ impl Desegmenter {
header_pmmr,
&header_head,
genesis,
status,
status.clone(),
) {
error!("Error validating pibd hashset: {}", e);
// TODO: Set state appropriately, state sync can rewind and start again, etc
status.update_pibd_progress(false, true, latest_block_height, header_head.height);
}
stop_state.stop();
}
Expand All @@ -267,13 +287,6 @@ impl Desegmenter {
txhashset.roots().validate(header_head)?;
}

//debug!("desegmenter validation: compacting");
/*{
let mut txhashset = txhashset.write();
let batch = store.batch()?;
txhashset.compact(header_head, &batch)?;
}*/

status.on_setup();

// Validate kernel history
Expand Down
6 changes: 6 additions & 0 deletions chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,12 @@ impl<'a> Extension<'a> {
self.rproof_pmmr.readonly_pmmr()
}

/// Reset prune lists
pub fn reset_prune_lists(&mut self) {
self.output_pmmr.reset_prune_list();
self.rproof_pmmr.reset_prune_list();
}

/// Apply a new block to the current txhashet extension (output, rangeproof, kernel MMRs).
/// Returns a vec of commit_pos representing the pos and height of the outputs spent
/// by this block.
Expand Down
23 changes: 23 additions & 0 deletions chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ pub enum SyncStatus {
/// Whether the syncer has determined there's not enough
/// data to continue via PIBD
aborted: bool,
/// whether we got an error anywhere (in which case restart the process)
errored: bool,
/// 'height', i.e. last 'block' for which there is complete
/// pmmr data
completed_to_height: u64,
/// Total 'height' needed
required_height: u64,
},
/// Downloading the various txhashsets
TxHashsetDownload(TxHashsetDownloadStats),
Expand Down Expand Up @@ -217,6 +224,22 @@ impl SyncState {
*self.current.write() = SyncStatus::TxHashsetDownload(stats);
}

/// Update PIBD progress
pub fn update_pibd_progress(
&self,
aborted: bool,
errored: bool,
completed_to_height: u64,
required_height: u64,
) {
*self.current.write() = SyncStatus::TxHashsetPibd {
aborted,
errored,
completed_to_height,
required_height,
};
}

/// Update PIBD segment list
pub fn add_pibd_segment(&self, id: &SegmentTypeIdentifier) {
self.requested_pibd_segments.write().push(id.clone());
Expand Down
3 changes: 3 additions & 0 deletions core/src/core/pmmr/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub trait Backend<T: PMMRable> {
/// Release underlying datafiles and locks
fn release_files(&mut self);

/// Reset prune list, used when PIBD is reset
fn reset_prune_list(&mut self);

/// Saves a snapshot of the rewound utxo file with the block hash as
/// filename suffix. We need this when sending a txhashset zip file to a
/// node for fast sync.
Expand Down
5 changes: 5 additions & 0 deletions core/src/core/pmmr/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ where
Ok(())
}

/// Reset prune list
pub fn reset_prune_list(&mut self) {
self.backend.reset_prune_list();
}

/// Remove the specified position from the leaf set
pub fn remove_from_leaf_set(&mut self, pos0: u64) {
self.backend.remove_from_leaf_set(pos0);
Expand Down
4 changes: 4 additions & 0 deletions core/src/core/pmmr/vec_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
unimplemented!()
}

fn reset_prune_list(&mut self) {
unimplemented!()
}

fn rewind(&mut self, position: u64, _rewind_rm_pos: &Bitmap) -> Result<(), String> {
if let Some(data) = &mut self.data {
let idx = pmmr::n_leaves(position);
Expand Down
2 changes: 1 addition & 1 deletion servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ where
{
let res = d.add_bitmap_segment(segment, output_root);
if let Err(e) = res {
debug!(
error!(
"Validation of incoming bitmap segment failed: {:?}, reason: {}",
identifier, e
);
Expand Down
39 changes: 36 additions & 3 deletions servers/src/grin/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ impl StateSync {
}
};

// Check whether we've errored and should restart pibd
if using_pibd {
if let SyncStatus::TxHashsetPibd { errored: true, .. } = self.sync_state.status() {
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
error!("PIBD Reported Failure - Restarting Sync");
// reset desegmenter state
let desegmenter = self
.chain
.desegmenter(&archive_header, self.sync_state.clone())
.unwrap();

if let Some(d) = desegmenter.write().as_mut() {
d.reset();
};
if let Err(e) = self.chain.reset_chain_head(self.chain.genesis(), false) {
error!("pibd_sync restart: chain reset error = {}", e);
}
if let Err(e) = self.chain.reset_pibd_head() {
error!("pibd_sync restart: reset pibd_head error = {}", e);
}
if let Err(e) = self.chain.reset_prune_lists() {
error!("pibd_sync restart: reset prune lists error = {}", e);
}
self.sync_state.update_pibd_progress(false, false, 1, 1);
sync_need_restart = true;
}
}

// check peer connection status of this sync
if !using_pibd {
if let Some(ref peer) = self.state_sync_peer {
Expand Down Expand Up @@ -129,11 +157,14 @@ impl StateSync {
// run fast sync if applicable, normally only run one-time, except restart in error
if sync_need_restart || header_head.height == highest_height {
if using_pibd {
if sync_need_restart {
return true;
}
let (launch, _download_timeout) = self.state_sync_due();
if launch {
self.sync_state
.update(SyncStatus::TxHashsetPibd { aborted: false });
let archive_header = self.chain.txhashset_archive_header_header_only().unwrap();
self.sync_state
.update_pibd_progress(false, false, 1, archive_header.height);
let desegmenter = self
.chain
.desegmenter(&archive_header, self.sync_state.clone())
Expand Down Expand Up @@ -195,7 +226,9 @@ impl StateSync {
if let Some(d) = de.as_mut() {
let res = d.apply_next_segments();
if let Err(e) = res {
debug!("error applying segment, continuing: {}", e);
debug!("error applying segment: {}", e);
self.sync_state.update_pibd_progress(false, true, 1, 1);
return false;
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions src/bin/tui/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,21 @@ impl TUIStatusView {
};
Cow::Owned(format!("Sync step 1/7: Downloading headers: {}%", percent))
}
SyncStatus::TxHashsetPibd { .. } => {
Cow::Borrowed("Sync step 2/7: Performing PIBD Body Sync (experimental)")
SyncStatus::TxHashsetPibd {
aborted: _,
errored: _,
completed_to_height,
required_height,
} => {
let percent = if required_height == 0 {
0
} else {
completed_to_height * 100 / required_height
};
Cow::Owned(format!(
"Sync step 2/7: Downloading chain state - {} / {} Blocks - {}%",
completed_to_height, required_height, percent
))
}
SyncStatus::TxHashsetDownload(stat) => {
if stat.total_size > 0 {
Expand Down
8 changes: 8 additions & 0 deletions store/src/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,14 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
Ok(())
}

fn reset_prune_list(&mut self) {
let bitmap = Bitmap::create();
self.prune_list = PruneList::new(Some(self.data_dir.join(PMMR_PRUN_FILE)), bitmap);
if let Err(e) = self.prune_list.flush() {
error!("Flushing reset prune list: {}", e);
}
}

/// Remove by insertion position.
fn remove(&mut self, pos0: u64) -> Result<(), String> {
assert!(self.prunable, "Remove on non-prunable MMR");
Expand Down