Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Moved syncing log out of the client #1670

Merged
merged 1 commit into from
Jul 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ pub trait ChainNotify : Send + Sync {
_invalid: Vec<H256>,
_enacted: Vec<H256>,
_retracted: Vec<H256>,
_sealed: Vec<H256>) {
_sealed: Vec<H256>,
_duration: u64) {
// does nothing by default
}

Expand Down
94 changes: 32 additions & 62 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use std::sync::{Arc, Weak};
use std::path::{Path, PathBuf};
use std::fmt;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::time::{Instant, Duration};
use std::time::{Instant};
use time::precise_time_ns;

// util
use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock, Colour};
use util::{journaldb, rlp, Bytes, Stream, View, PerfTimer, Itertools, Mutex, RwLock};
use util::journaldb::JournalDB;
use util::rlp::{RlpStream, Rlp, UntrustedRlp};
use util::numbers::*;
Expand Down Expand Up @@ -135,10 +135,8 @@ pub struct Client {
sleep_state: Mutex<SleepState>,
liveness: AtomicBool,
io_channel: IoChannel<ClientIoMessage>,
notify: RwLock<Option<Weak<ChainNotify>>>,
notify: RwLock<Vec<Weak<ChainNotify>>>,
queue_transactions: AtomicUsize,
skipped: AtomicUsize,
last_import: Mutex<Instant>,
last_hashes: RwLock<VecDeque<H256>>,
}

Expand Down Expand Up @@ -229,24 +227,24 @@ impl Client {
trie_factory: TrieFactory::new(config.trie_spec),
miner: miner,
io_channel: message_channel,
notify: RwLock::new(None),
notify: RwLock::new(Vec::new()),
queue_transactions: AtomicUsize::new(0),
skipped: AtomicUsize::new(0),
last_import: Mutex::new(Instant::now()),
last_hashes: RwLock::new(VecDeque::new()),
};
Ok(Arc::new(client))
}

/// Sets the actor to be notified on certain events
pub fn set_notify(&self, target: &Arc<ChainNotify>) {
let mut write_lock = self.notify.write();
*write_lock = Some(Arc::downgrade(target));
/// Adds an actor to be notified on certain events
pub fn add_notify(&self, target: &Arc<ChainNotify>) {
self.notify.write().push(Arc::downgrade(target));
}

fn notify(&self) -> Option<Arc<ChainNotify>> {
let read_lock = self.notify.read();
read_lock.as_ref().and_then(|weak| weak.upgrade())
fn notify<F>(&self, f: F) where F: Fn(&ChainNotify) {
for np in self.notify.read().iter() {
if let Some(n) = np.upgrade() {
f(&*n);
}
}
}

/// Flush the block import queue.
Expand Down Expand Up @@ -357,28 +355,24 @@ impl Client {
/// This is triggered by a message coming from a block queue when the block is ready for insertion
pub fn import_verified_blocks(&self) -> usize {
let max_blocks_to_import = 64;
let (imported_blocks, import_results, invalid_blocks, original_best, imported) = {
let (imported_blocks, import_results, invalid_blocks, original_best, imported, duration) = {
let mut imported_blocks = Vec::with_capacity(max_blocks_to_import);
let mut invalid_blocks = HashSet::new();
let mut import_results = Vec::with_capacity(max_blocks_to_import);

let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_verified_blocks");
let start = precise_time_ns();
let blocks = self.block_queue.drain(max_blocks_to_import);

let original_best = self.chain_info().best_block_hash;

for block in blocks {
let header = &block.header;
let start = precise_time_ns();

if invalid_blocks.contains(&header.parent_hash) {
invalid_blocks.insert(header.hash());
continue;
}
let tx_count = block.transactions.len();
let size = block.bytes.len();

let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
invalid_blocks.insert(header.hash());
Expand All @@ -392,30 +386,6 @@ impl Client {
import_results.push(route);

self.report.write().accrue_block(&block);

let duration_ns = precise_time_ns() - start;

let mut last_import = self.last_import.lock();
if Instant::now() > *last_import + Duration::from_secs(1) {
let queue_info = self.queue_info();
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
if !importing {
let skipped = self.skipped.load(AtomicOrdering::Relaxed);
info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}",
Colour::White.bold().paint(format!("#{}", header.number())),
Colour::White.bold().paint(format!("{}", header.hash())),
Colour::Yellow.bold().paint(format!("{}", tx_count)),
Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)),
Colour::Purple.bold().paint(format!("{:.2}", duration_ns as f32 / 1000000f32)),
Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)),
if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() }
);
*last_import = Instant::now();
}
self.skipped.store(0, AtomicOrdering::Relaxed);
} else {
self.skipped.fetch_add(1, AtomicOrdering::Relaxed);
}
}

let imported = imported_blocks.len();
Expand All @@ -429,7 +399,8 @@ impl Client {
self.block_queue.mark_as_good(&imported_blocks);
}
}
(imported_blocks, import_results, invalid_blocks, original_best, imported)
let duration_ns = precise_time_ns() - start;
(imported_blocks, import_results, invalid_blocks, original_best, imported, duration_ns)
};

{
Expand All @@ -440,15 +411,16 @@ impl Client {
self.miner.chain_new_blocks(self, &imported_blocks, &invalid_blocks, &enacted, &retracted);
}

if let Some(notify) = self.notify() {
self.notify(|notify| {
notify.new_blocks(
imported_blocks,
invalid_blocks,
enacted,
retracted,
imported_blocks.clone(),
invalid_blocks.clone(),
enacted.clone(),
retracted.clone(),
Vec::new(),
duration,
);
}
});
}
}

Expand Down Expand Up @@ -640,9 +612,7 @@ impl Client {
fn wake_up(&self) {
if !self.liveness.load(AtomicOrdering::Relaxed) {
self.liveness.store(true, AtomicOrdering::Relaxed);
if let Some(notify) = self.notify() {
notify.start();
}
self.notify(|n| n.start());
trace!(target: "mode", "wake_up: Waking.");
}
}
Expand All @@ -652,9 +622,7 @@ impl Client {
// only sleep if the import queue is mostly empty.
if self.queue_info().total_queue_size() <= MAX_QUEUE_SIZE_TO_SLEEP_ON {
self.liveness.store(false, AtomicOrdering::Relaxed);
if let Some(notify) = self.notify() {
notify.stop();
}
self.notify(|n| n.stop());
trace!(target: "mode", "sleep: Sleeping.");
} else {
trace!(target: "mode", "sleep: Cannot sleep - syncing ongoing.");
Expand Down Expand Up @@ -1029,6 +997,7 @@ impl MiningBlockChainClient for Client {
fn import_sealed_block(&self, block: SealedBlock) -> ImportResult {
let _import_lock = self.import_lock.lock();
let _timer = PerfTimer::new("import_sealed_block");
let start = precise_time_ns();

let original_best = self.chain_info().best_block_hash;

Expand All @@ -1043,15 +1012,16 @@ impl MiningBlockChainClient for Client {
let (enacted, retracted) = self.calculate_enacted_retracted(&[route]);
self.miner.chain_new_blocks(self, &[h.clone()], &[], &enacted, &retracted);

if let Some(notify) = self.notify() {
self.notify(|notify| {
notify.new_blocks(
vec![h.clone()],
vec![],
enacted,
retracted,
enacted.clone(),
retracted.clone(),
vec![h.clone()],
precise_time_ns() - start,
);
}
});
}

if self.chain_info().best_block_hash != original_best {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ impl ClientService {
}

/// Set the actor to be notified on certain chain events
pub fn set_notify(&self, notify: &Arc<ChainNotify>) {
self.client.set_notify(notify);
pub fn add_notify(&self, notify: &Arc<ChainNotify>) {
self.client.add_notify(notify);
}
}

Expand Down
91 changes: 64 additions & 27 deletions parity/informant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ extern crate ansi_term;
use self::ansi_term::Colour::{White, Yellow, Green, Cyan, Blue};
use self::ansi_term::Style;

use std::sync::{Arc};
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::time::{Instant, Duration};
use std::ops::{Deref, DerefMut};
use isatty::{stdout_isatty};
use ethsync::{SyncStatus, NetworkConfiguration};
use util::{Uint, RwLock};
use ethsync::{SyncProvider, ManageNetwork};
use util::{Uint, RwLock, Mutex, H256, Colour};
use ethcore::client::*;
use ethcore::views::BlockView;
use number_prefix::{binary_prefix, Standalone, Prefixed};

pub struct Informant {
Expand All @@ -32,18 +35,11 @@ pub struct Informant {
report: RwLock<Option<ClientReport>>,
last_tick: RwLock<Instant>,
with_color: bool,
}

impl Default for Informant {
fn default() -> Self {
Informant {
chain_info: RwLock::new(None),
cache_info: RwLock::new(None),
report: RwLock::new(None),
last_tick: RwLock::new(Instant::now()),
with_color: true,
}
}
client: Arc<Client>,
sync: Option<Arc<SyncProvider>>,
net: Option<Arc<ManageNetwork>>,
last_import: Mutex<Instant>,
skipped: AtomicUsize,
}

trait MillisecondDuration {
Expand All @@ -58,13 +54,18 @@ impl MillisecondDuration for Duration {

impl Informant {
/// Make a new instance potentially `with_color` output.
pub fn new(with_color: bool) -> Self {
pub fn new(client: Arc<Client>, sync: Option<Arc<SyncProvider>>, net: Option<Arc<ManageNetwork>>, with_color: bool) -> Self {
Informant {
chain_info: RwLock::new(None),
cache_info: RwLock::new(None),
report: RwLock::new(None),
last_tick: RwLock::new(Instant::now()),
with_color: with_color,
client: client,
sync: sync,
net: net,
last_import: Mutex::new(Instant::now()),
skipped: AtomicUsize::new(0),
}
}

Expand All @@ -77,25 +78,28 @@ impl Informant {


#[cfg_attr(feature="dev", allow(match_bool))]
pub fn tick(&self, client: &Client, maybe_status: Option<(SyncStatus, NetworkConfiguration)>) {
pub fn tick(&self) {
let elapsed = self.last_tick.read().elapsed();
if elapsed < Duration::from_secs(5) {
return;
}

let chain_info = client.chain_info();
let queue_info = client.queue_info();
let cache_info = client.blockchain_cache_info();
let chain_info = self.client.chain_info();
let queue_info = self.client.queue_info();
let cache_info = self.client.blockchain_cache_info();
let network_config = self.net.as_ref().map(|n| n.network_config());
let sync_status = self.sync.as_ref().map(|s| s.status());

let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3;
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3
|| self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing());
if !importing && elapsed < Duration::from_secs(30) {
return;
}

*self.last_tick.write() = Instant::now();

let mut write_report = self.report.write();
let report = client.report();
let report = self.client.report();

let paint = |c: Style, t: String| match self.with_color && stdout_isatty() {
true => format!("{}", c.paint(t)),
Expand All @@ -120,8 +124,8 @@ impl Informant {
),
false => String::new(),
},
match maybe_status {
Some((ref sync_info, ref net_config)) => format!("{}{}/{}/{} peers",
match (&sync_status, &network_config) {
(&Some(ref sync_info), &Some(ref net_config)) => format!("{}{}/{}/{} peers",
match importing {
true => format!("{} ", paint(Green.bold(), format!("{:>8}", format!("#{}", sync_info.last_imported_block_number.unwrap_or(chain_info.best_block_number))))),
false => String::new(),
Expand All @@ -130,14 +134,14 @@ impl Informant {
paint(Cyan.bold(), format!("{:2}", sync_info.num_peers)),
paint(Cyan.bold(), format!("{:2}", net_config.ideal_peers))
),
None => String::new(),
_ => String::new(),
},
format!("{} db {} chain {} queue{}",
format!("{} db {} chain {} queue{}",
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(report.state_db_mem))),
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(cache_info.total()))),
paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(queue_info.mem_used))),
match maybe_status {
Some((ref sync_info, _)) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))),
match sync_status {
Some(ref sync_info) => format!(" {} sync", paint(Blue.bold(), format!("{:>8}", Informant::format_bytes(sync_info.mem_used)))),
_ => String::new(),
}
)
Expand All @@ -149,3 +153,36 @@ impl Informant {
}
}

impl ChainNotify for Informant {
fn new_blocks(&self, _imported: Vec<H256>, _invalid: Vec<H256>, enacted: Vec<H256>, _retracted: Vec<H256>, _sealed: Vec<H256>, duration: u64) {
let mut last_import = self.last_import.lock();
if Instant::now() > *last_import + Duration::from_secs(1) {
let queue_info = self.client.queue_info();
let importing = queue_info.unverified_queue_size + queue_info.verified_queue_size > 3
|| self.sync.as_ref().map_or(false, |s| s.status().is_major_syncing());
if !importing {
if let Some(block) = enacted.last().and_then(|h| self.client.block(BlockID::Hash(h.clone()))) {
let view = BlockView::new(&block);
let header = view.header();
let tx_count = view.transactions_count();
let size = block.len();
let skipped = self.skipped.load(AtomicOrdering::Relaxed);
info!(target: "import", "Imported {} {} ({} txs, {} Mgas, {} ms, {} KiB){}",
Colour::White.bold().paint(format!("#{}", header.number())),
Colour::White.bold().paint(format!("{}", header.hash())),
Colour::Yellow.bold().paint(format!("{}", tx_count)),
Colour::Yellow.bold().paint(format!("{:.2}", header.gas_used.low_u64() as f32 / 1000000f32)),
Colour::Purple.bold().paint(format!("{:.2}", duration as f32 / 1000000f32)),
Colour::Blue.bold().paint(format!("{:.2}", size as f32 / 1024f32)),
if skipped > 0 { format!(" + another {} block(s)", Colour::Red.bold().paint(format!("{}", skipped))) } else { String::new() }
);
*last_import = Instant::now();
}
}
self.skipped.store(0, AtomicOrdering::Relaxed);
} else {
self.skipped.fetch_add(enacted.len(), AtomicOrdering::Relaxed);
}
}
}

Loading