Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix the status marking problem of header sync #2857

Merged
merged 1 commit into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions sync/src/synchronizer/headers_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,11 @@ impl<'a> HeadersProcess<'a> {

if headers.is_empty() {
debug!("HeadersProcess is_empty (synchronized)");
let ibd = self.active_chain.is_initial_block_download();
if !ibd {
if let Some(ref mut peer_state) =
self.synchronizer.peers().state.get_mut(&self.peer)
{
peer_state.stop_headers_sync();
}
if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
self.synchronizer
.shared()
.state()
.tip_synced(state.value_mut());
}
return Status::ok();
}
Expand Down Expand Up @@ -180,16 +178,19 @@ impl<'a> HeadersProcess<'a> {
let start = headers.last().expect("empty checked");
self.active_chain
.send_getheaders_to_peer(self.nc, self.peer, start);
} else if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
self.synchronizer
.shared()
.state()
.tip_synced(state.value_mut());
}

// If we're in IBD, we want outbound peers that will serve us a useful
// chain. Disconnect peers that are on chains with insufficient work.
let peer_flags = self
.synchronizer
.peers()
.state
.get(&self.peer)
.map(|state| state.peer_flags)
.get_flag(self.peer)
.unwrap_or_default();
if self.active_chain.is_initial_block_download()
&& headers.len() != MAX_HEADERS_LEN
Expand Down
16 changes: 7 additions & 9 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
pub const NO_PEER_CHECK_TOKEN: u64 = 255;

const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_millis(200);
const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);

Expand Down Expand Up @@ -503,13 +503,11 @@ impl Synchronizer {
}
{
if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
if !peer_state.sync_started() {
peer_state.start_sync(HeadersSyncController::from_header(&tip));
self.shared()
.state()
.n_sync_started()
.fetch_add(1, Ordering::Release);
}
peer_state.start_sync(HeadersSyncController::from_header(&tip));
self.shared()
.state()
.n_sync_started()
.fetch_add(1, Ordering::Release);
}
}

Expand Down Expand Up @@ -544,7 +542,7 @@ impl Synchronizer {
|| state.peer_flags.is_whitelist
|| state.peer_flags.is_protect
}
IBDState::Out => state.sync_started(),
IBDState::Out => state.started_or_tip_synced(),
}
})
.map(|kv_pair| *kv_pair.key())
Expand Down
44 changes: 32 additions & 12 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl ChainSyncState {
HeadersSyncState::Initialized => false,
HeadersSyncState::SyncProtocolConnected => true,
HeadersSyncState::Started => false,
HeadersSyncState::Suspend(until) => until < now,
HeadersSyncState::Suspend(until) | HeadersSyncState::TipSynced(until) => until < now,
}
}

Expand All @@ -100,9 +100,22 @@ impl ChainSyncState {
self.headers_sync_state = HeadersSyncState::Suspend(until)
}

fn tip_synced(&mut self) {
let now = unix_time_as_millis();
// use avg block interval: (MAX_BLOCK_INTERVAL + MIN_BLOCK_INTERVAL) / 2 = 28
self.headers_sync_state = HeadersSyncState::TipSynced(now + 28000);
}

fn started(&self) -> bool {
matches!(self.headers_sync_state, HeadersSyncState::Started)
}

fn started_or_tip_synced(&self) -> bool {
matches!(
self.headers_sync_state,
HeadersSyncState::Started | HeadersSyncState::TipSynced(_)
)
}
}

#[derive(Clone, Debug)]
Expand All @@ -111,6 +124,7 @@ enum HeadersSyncState {
SyncProtocolConnected,
Started,
Suspend(u64), // suspend headers sync until this timestamp (milliseconds since unix epoch)
TipSynced(u64), // already synced to the end, not as the sync target for the time being, until the pause time is exceeded
}

impl Default for HeadersSyncState {
Expand Down Expand Up @@ -287,22 +301,27 @@ impl PeerState {
self.headers_sync_controller = Some(headers_sync_controller);
}

pub fn suspend_sync(&mut self, suspend_time: u64) {
fn suspend_sync(&mut self, suspend_time: u64) {
let now = unix_time_as_millis();
self.chain_sync.suspend(now + suspend_time);
self.stop_headers_sync();
self.headers_sync_controller = None;
}

fn tip_synced(&mut self) {
self.chain_sync.tip_synced();
self.headers_sync_controller = None;
}

pub(crate) fn sync_started(&self) -> bool {
self.chain_sync.started()
}

pub(crate) fn sync_connected(&mut self) {
self.chain_sync.connected()
pub(crate) fn started_or_tip_synced(&self) -> bool {
self.chain_sync.started_or_tip_synced()
}

pub(crate) fn stop_headers_sync(&mut self) {
self.headers_sync_controller = None;
pub(crate) fn sync_connected(&mut self) {
self.chain_sync.connected()
}
}

Expand Down Expand Up @@ -1611,11 +1630,12 @@ impl SyncState {

pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) {
peer_state.suspend_sync(SUSPEND_SYNC_TIME);
assert_ne!(
self.n_sync_started().fetch_sub(1, Ordering::Release),
0,
"n_sync_started overflow when suspend_sync"
);
self.n_sync_started().fetch_sub(1, Ordering::Release);
}

pub(crate) fn tip_synced(&self, peer_state: &mut PeerState) {
peer_state.tip_synced();
self.n_sync_started().fetch_sub(1, Ordering::Release);
}

pub fn mark_as_known_tx(&self, hash: Byte32) {
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CellBeingCellDepThenSpentInSameBlockTestSubmitBlock),
Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplate),
Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplateMultiple),
Box::new(HeaderSyncCycle),
// Test hard fork features
Box::new(CheckAbsoluteEpochSince),
Box::new(CheckRelativeEpochSince),
Expand Down
44 changes: 43 additions & 1 deletion test/src/specs/sync/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ckb_logger::info;
use ckb_network::{bytes::Bytes, extract_peer_id, SupportProtocols};
use ckb_types::{
core::BlockView,
packed::{self, Byte32, SyncMessage},
packed::{self, Byte32, SendHeaders, SyncMessage},
prelude::*,
};
use std::time::Duration;
Expand Down Expand Up @@ -460,6 +460,48 @@ impl Spec for SyncTooNewBlock {
}
}

pub struct HeaderSyncCycle;

impl Spec for HeaderSyncCycle {
crate::setup!(num_nodes: 1);

fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
out_ibd_mode(nodes);

let mut net = Net::new(self.name(), node0.consensus(), vec![SupportProtocols::Sync]);
net.connect(node0);

let send_headers = SendHeaders::new_builder()
.headers(Vec::new().pack())
.build();

let msg = SyncMessage::new_builder()
.set(send_headers)
.build()
.as_bytes();

let ret = net.should_receive(node0, |data: &Bytes| {
SyncMessage::from_slice(&data)
.map(|message| matches!(message.to_enum(), packed::SyncMessageUnion::GetHeaders(_)))
.unwrap_or(false)
});
assert!(ret, "Test node should receive Getheader message from node");

net.send(node0, SupportProtocols::Sync, msg);

let ret = net.should_receive(node0, |data: &Bytes| {
SyncMessage::from_slice(&data)
.map(|message| matches!(message.to_enum(), packed::SyncMessageUnion::GetHeaders(_)))
.unwrap_or(false)
});
assert!(
ret,
"Test node should receive Getheader message from node twice"
);
}
}

fn build_forks(node: &Node, offsets: &[u64]) -> Vec<BlockView> {
let rpc_client = node.rpc_client();
let mut blocks = Vec::with_capacity(offsets.len());
Expand Down