Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track sync_head on header_sync sync status #3626

Merged
merged 4 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/src/handlers/server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ fn sync_status_to_api(sync_status: SyncStatus) -> (String, Option<serde_json::Va
SyncStatus::NoSync => ("no_sync".to_string(), None),
SyncStatus::AwaitingPeers(_) => ("awaiting_peers".to_string(), None),
SyncStatus::HeaderSync {
current_height,
sync_head,
highest_height,
..
} => (
"header_sync".to_string(),
Some(json!({ "current_height": current_height, "highest_height": highest_height })),
Some(json!({ "current_height": sync_head.height, "highest_height": highest_height })),
),
SyncStatus::TxHashsetDownload(stats) => (
"txhashset_download".to_string(),
Expand Down
40 changes: 26 additions & 14 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,19 +404,23 @@ impl Chain {
/// Attempt to add new headers to the header chain (or fork).
/// This is only ever used during sync and is based on sync_head.
/// We update header_head here if our total work increases.
pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> {
/// Returns the new sync_head (may temporarily diverge from header_head when syncing a long fork).
pub fn sync_block_headers(
&self,
headers: &[BlockHeader],
sync_head: Tip,
opts: Options,
) -> Result<Option<Tip>, Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;

// Sync the chunk of block headers, updating header_head if total work increases.
{
let batch = self.store.batch()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
pipe::process_block_headers(headers, &mut ctx)?;
ctx.batch.commit()?;
}
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;
let sync_head = pipe::process_block_headers(headers, sync_head, &mut ctx)?;
ctx.batch.commit()?;

Ok(())
Ok(sync_head)
}

/// Build a new block processing context.
Expand Down Expand Up @@ -1410,12 +1414,20 @@ impl Chain {
}

/// Gets multiple headers at the provided heights.
pub fn get_locator_hashes(&self, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let pmmr = self.header_pmmr.read();
heights
.iter()
.map(|h| pmmr.get_header_hash_by_height(*h))
.collect()
/// Note: This is based on the provided sync_head to support syncing against a fork.
pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let mut header_pmmr = self.header_pmmr.write();
txhashset::header_extending_readonly(&mut header_pmmr, &self.store(), |ext, batch| {
let header = batch.get_block_header(&sync_head.hash())?;
pipe::rewind_and_apply_header_fork(&header, ext, batch)?;

let hashes = heights
.iter()
.filter_map(|h| ext.get_header_hash_by_height(*h))
.collect();

Ok(hashes)
})
}

/// Builds an iterator on blocks starting from the current chain head and
Expand Down
62 changes: 31 additions & 31 deletions chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,48 +176,52 @@ pub fn process_block(

/// Process a batch of sequential block headers.
/// This is only used during header sync.
/// Will update header_head locally if this batch of headers increases total work.
/// Returns the updated sync_head, which may be on a fork.
pub fn process_block_headers(
headers: &[BlockHeader],
sync_head: Tip,
ctx: &mut BlockContext<'_>,
) -> Result<(), Error> {
) -> Result<Option<Tip>, Error> {
if headers.is_empty() {
return Ok(());
return Ok(None);
}
let last_header = headers.last().expect("last header");

// Check if we know about all these headers. If so we can accept them quickly.
// If they *do not* increase total work on the sync chain we are done.
// If they *do* increase total work then we should process them to update sync_head.
let head = {
let hash = ctx.header_pmmr.head_hash()?;
let header = ctx.batch.get_block_header(&hash)?;
Tip::from_header(&header)
};

if let Ok(existing) = ctx.batch.get_block_header(&last_header.hash()) {
if !has_more_work(&existing, &head) {
return Ok(());
}
}
let head = ctx.batch.header_head()?;

// Validate each header in the chunk and add to our db.
// Note: This batch may be rolled back later if the MMR does not validate successfully.
// Note: This batch may later be committed even if the MMR itself is rollbacked.
for header in headers {
validate_header(header, ctx)?;
add_block_header(header, &ctx.batch)?;
}

// Now apply this entire chunk of headers to the sync MMR (ctx is sync MMR specific).
// Now apply this entire chunk of headers to the header MMR.
txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| {
rewind_and_apply_header_fork(&last_header, ext, batch)?;
Ok(())
})?;

if has_more_work(last_header, &head) {
update_header_head(&Tip::from_header(last_header), &mut ctx.batch)?;
}
// If previous sync_head is not on the "current" chain then
// these headers are on an alternative fork to sync_head.
let alt_fork = !ext.is_on_current_chain(sync_head, batch)?;

// Update our "header_head" if this batch results in an increase in total work.
// Otherwise rollback this header extension.
// Note the outer batch may still be committed to db assuming no errors occur in the extension.
if has_more_work(last_header, &head) {
let header_head = last_header.into();
update_header_head(&header_head, &batch)?;
} else {
ext.force_rollback();
};

Ok(())
if alt_fork || has_more_work(last_header, &sync_head) {
Ok(Some(last_header.into()))
} else {
Ok(None)
}
})
}

/// Process a block header. Update the header MMR and corresponding header_head if this header
Expand Down Expand Up @@ -500,7 +504,7 @@ fn add_block_header(bh: &BlockHeader, batch: &store::Batch<'_>) -> Result<(), Er
Ok(())
}

fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
fn update_header_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> {
batch
.save_header_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save header head".to_owned()))?;
Expand All @@ -513,7 +517,7 @@ fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Er
Ok(())
}

fn update_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> {
fn update_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> {
batch
.save_body_head(&head)
.map_err(|e| ErrorKind::StoreErr(e, "pipe save body".to_owned()))?;
Expand All @@ -536,7 +540,7 @@ pub fn rewind_and_apply_header_fork(
) -> Result<(), Error> {
let mut fork_hashes = vec![];
let mut current = header.clone();
while current.height > 0 && ext.is_on_current_chain(&current, batch).is_err() {
while current.height > 0 && !ext.is_on_current_chain(&current, batch)? {
fork_hashes.push(current.hash());
current = batch.get_previous_header(&current)?;
}
Expand Down Expand Up @@ -577,11 +581,7 @@ pub fn rewind_and_apply_fork(

// Rewind the txhashset extension back to common ancestor based on header MMR.
let mut current = batch.head_header()?;
while current.height > 0
&& header_extension
.is_on_current_chain(&current, batch)
.is_err()
{
while current.height > 0 && !header_extension.is_on_current_chain(&current, batch)? {
current = batch.get_previous_header(&current)?;
}
let fork_point = current;
Expand Down
29 changes: 16 additions & 13 deletions chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ impl<'a> HeaderExtension<'a> {
self.head.clone()
}

/// Get header hash by height.
/// Based on current header MMR.
pub fn get_header_hash_by_height(&self, height: u64) -> Option<Hash> {
let pos = pmmr::insertion_to_pmmr_index(height + 1);
self.get_header_hash(pos)
}

/// Get the header at the specified height based on the current state of the header extension.
/// Derives the MMR pos from the height (insertion index) and retrieves the header hash.
/// Looks the header up in the db by hash.
Expand All @@ -923,8 +930,7 @@ impl<'a> HeaderExtension<'a> {
height: u64,
batch: &Batch<'_>,
) -> Result<BlockHeader, Error> {
let pos = pmmr::insertion_to_pmmr_index(height + 1);
if let Some(hash) = self.get_header_hash(pos) {
if let Some(hash) = self.get_header_hash_by_height(height) {
Ok(batch.get_block_header(&hash)?)
} else {
Err(ErrorKind::Other("get header by height".to_string()).into())
Expand All @@ -933,20 +939,17 @@ impl<'a> HeaderExtension<'a> {

/// Compares the provided header to the header in the header MMR at that height.
/// If these match we know the header is on the current chain.
pub fn is_on_current_chain(
pub fn is_on_current_chain<T: Into<Tip>>(
&self,
header: &BlockHeader,
t: T,
batch: &Batch<'_>,
) -> Result<(), Error> {
if header.height > self.head.height {
return Err(ErrorKind::Other("not on current chain, out beyond".to_string()).into());
}
let chain_header = self.get_header_by_height(header.height, batch)?;
if chain_header.hash() == header.hash() {
Ok(())
} else {
Err(ErrorKind::Other("not on current chain".to_string()).into())
) -> Result<bool, Error> {
let t = t.into();
if t.height > self.head.height {
return Ok(false);
}
let chain_header = self.get_header_by_height(t.height, batch)?;
Ok(chain_header.hash() == t.hash())
}

/// Force the rollback of this extension, no matter the result.
Expand Down
36 changes: 27 additions & 9 deletions chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ bitflags! {
}

/// Various status sync can be in, whether it's fast sync or archival.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Debug, Clone, Copy, PartialEq, Deserialize, Serialize)]
pub enum SyncStatus {
/// Initial State (we do not yet know if we are/should be syncing)
Initial,
Expand All @@ -49,10 +49,12 @@ pub enum SyncStatus {
AwaitingPeers(bool),
/// Downloading block headers
HeaderSync {
/// current node height
current_height: u64,
/// current sync head
sync_head: Tip,
/// height of the most advanced peer
highest_height: u64,
/// diff of the most advanced peer
highest_diff: Difficulty,
},
/// Downloading the various txhashsets
TxHashsetDownload(TxHashsetDownloadStats),
Expand Down Expand Up @@ -176,6 +178,17 @@ impl SyncState {
}
}

/// Update sync_head if state is currently HeaderSync.
pub fn update_header_sync(&self, new_sync_head: Tip) {
let status: &mut SyncStatus = &mut self.current.write();
match status {
SyncStatus::HeaderSync { sync_head, .. } => {
*sync_head = new_sync_head;
}
_ => (),
}
}

/// Update txhashset downloading progress
pub fn update_txhashset_download(&self, stats: TxHashsetDownloadStats) {
*self.current.write() = SyncStatus::TxHashsetDownload(stats);
Expand Down Expand Up @@ -346,12 +359,7 @@ pub struct Tip {
impl Tip {
/// Creates a new tip based on provided header.
pub fn from_header(header: &BlockHeader) -> Tip {
Tip {
height: header.height,
last_block_h: header.hash(),
prev_block_h: header.prev_hash,
total_difficulty: header.total_difficulty(),
}
header.into()
}
}

Expand All @@ -372,6 +380,16 @@ impl Default for Tip {
}
}
}
impl From<&BlockHeader> for Tip {
fn from(header: &BlockHeader) -> Tip {
Tip {
height: header.height,
last_block_h: header.hash(),
prev_block_h: header.prev_hash,
total_difficulty: header.total_difficulty(),
}
}
}

/// Serialization of a tip, required to save to datastore.
impl ser::Writeable for Tip {
Expand Down
25 changes: 22 additions & 3 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,28 @@ where
return Ok(false);
}

// try to add headers to our header chain
match self.chain().sync_block_headers(bhs, chain::Options::SYNC) {
Ok(_) => Ok(true),
// Read our sync_head if we are in header_sync.
// If not then we can ignore this batch of headers.
let sync_head = match self.sync_state.status() {
SyncStatus::HeaderSync { sync_head, .. } => sync_head,
_ => {
debug!("headers_received: ignoring as not in header_sync");
return Ok(true);
}
};

match self
.chain()
.sync_block_headers(bhs, sync_head, chain::Options::SYNC)
{
Ok(sync_head) => {
// If we have an updated sync_head after processing this batch of headers
// then update our sync_state so we can request relevant headers in the next batch.
if let Some(sync_head) = sync_head {
self.sync_state.update_header_sync(sync_head);
}
Ok(true)
}
Err(e) => {
debug!("Block headers refused by chain: {:?}", e);
if e.is_bad_data() {
Expand Down
Loading