Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Mutithreaded IO #198

Merged
merged 12 commits into from
Jan 22, 2016
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ time = "0.1"
evmjit = { path = "rust-evmjit", optional = true }
ethash = { path = "ethash" }
num_cpus = "0.2"
ctrlc = "1.0"

[features]
jit = ["evmjit"]
Expand Down
12 changes: 7 additions & 5 deletions ethash/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ impl EthashManager {
/// `nonce` - The nonce to pack into the mix
pub fn compute_light(&self, block_number: u64, header_hash: &H256, nonce: u64) -> ProofOfWork {
let epoch = block_number / ETHASH_EPOCH_LENGTH;
if !self.lights.read().unwrap().contains_key(&epoch) {
let mut lights = self.lights.write().unwrap(); // obtain write lock
if !lights.contains_key(&epoch) {
let light = Light::new(block_number);
lights.insert(epoch, light);
while !self.lights.read().unwrap().contains_key(&epoch) {
if let Ok(mut lights) = self.lights.try_write()
{
if !lights.contains_key(&epoch) {
let light = Light::new(block_number);
lights.insert(epoch, light);
}
}
}
self.lights.read().unwrap().get(&epoch).unwrap().compute(header_hash, nonce)
Expand Down
81 changes: 50 additions & 31 deletions src/bin/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ extern crate ethcore;
extern crate rustc_serialize;
extern crate log;
extern crate env_logger;
extern crate ctrlc;

use std::io::stdin;
use std::env;
use log::{LogLevelFilter};
use env_logger::LogBuilder;
use ctrlc::CtrlC;
use util::*;
use ethcore::client::*;
use ethcore::service::ClientService;
use ethcore::service::{ClientService, NetSyncMessage};
use ethcore::ethereum;
use ethcore::blockchain::CacheSize;
use ethcore::sync::*;
use ethcore::sync::EthSync;

fn setup_log() {
let mut builder = LogBuilder::new();
Expand All @@ -30,70 +31,88 @@ fn main() {
setup_log();
let spec = ethereum::new_frontier();
let mut service = ClientService::start(spec).unwrap();
let io_handler = Box::new(ClientIoHandler { client: service.client(), timer: 0, info: Default::default() });
let io_handler = Arc::new(ClientIoHandler { client: service.client(), info: Default::default(), sync: service.sync() });
service.io().register_handler(io_handler).expect("Error registering IO handler");
loop {
let mut cmd = String::new();
stdin().read_line(&mut cmd).unwrap();
if cmd == "quit\n" || cmd == "exit\n" || cmd == "q\n" {
break;
}
}

let exit = Arc::new(Condvar::new());
let e = exit.clone();
CtrlC::set_handler(move || {
e.notify_all();
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indention


let mutex = Mutex::new(());
let _ = exit.wait(mutex.lock().unwrap()).unwrap();
}

#[derive(Default, Debug)]
struct Informant {
chain_info: Option<BlockChainInfo>,
cache_info: Option<CacheSize>,
report: Option<ClientReport>,
chain_info: RwLock<Option<BlockChainInfo>>,
cache_info: RwLock<Option<CacheSize>>,
report: RwLock<Option<ClientReport>>,
}

impl Default for Informant {
fn default() -> Self {
Informant {
chain_info: RwLock::new(None),
cache_info: RwLock::new(None),
report: RwLock::new(None),
}
}
}

impl Informant {
pub fn tick(&mut self, client: &Client) {
pub fn tick(&self, client: &Client, sync: &EthSync) {
// 5 seconds betwen calls. TODO: calculate this properly.
let dur = 5usize;

let chain_info = client.chain_info();
let queue_info = client.queue_info();
let cache_info = client.cache_info();
let report = client.report();
let sync_info = sync.status();

if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (&self.chain_info, &self.cache_info, &self.report) {
println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //···{}···// {} ({}) bl {} ({}) ex ]",
if let (_, &Some(ref last_cache_info), &Some(ref last_report)) = (self.chain_info.read().unwrap().deref(), self.cache_info.read().unwrap().deref(), self.report.read().unwrap().deref()) {
println!("[ {} {} ]---[ {} blk/s | {} tx/s | {} gas/s //··· {}/{} peers, {} downloaded, {} queued ···// {} ({}) bl {} ({}) ex ]",
chain_info.best_block_number,
chain_info.best_block_hash,
(report.blocks_imported - last_report.blocks_imported) / dur,
(report.transactions_applied - last_report.transactions_applied) / dur,
(report.gas_processed - last_report.gas_processed) / From::from(dur),
0, // TODO: peers

sync_info.num_active_peers,
sync_info.num_peers,
sync_info.blocks_received,
queue_info.queue_size,

cache_info.blocks,
cache_info.blocks as isize - last_cache_info.blocks as isize,
cache_info.block_details,
cache_info.block_details as isize - last_cache_info.block_details as isize
);
}

self.chain_info = Some(chain_info);
self.cache_info = Some(cache_info);
self.report = Some(report);
*self.chain_info.write().unwrap().deref_mut() = Some(chain_info);
*self.cache_info.write().unwrap().deref_mut() = Some(cache_info);
*self.report.write().unwrap().deref_mut() = Some(report);
}
}

const INFO_TIMER: TimerToken = 0;

struct ClientIoHandler {
client: Arc<RwLock<Client>>,
timer: TimerToken,
client: Arc<Client>,
sync: Arc<EthSync>,
info: Informant,
}

impl IoHandler<NetSyncMessage> for ClientIoHandler {
fn initialize<'s>(&'s mut self, io: &mut IoContext<'s, NetSyncMessage>) {
self.timer = io.register_timer(5000).expect("Error registering timer");
fn initialize(&self, io: &IoContext<NetSyncMessage>) {
io.register_timer(INFO_TIMER, 5000).expect("Error registering timer");
}

fn timeout<'s>(&'s mut self, _io: &mut IoContext<'s, NetSyncMessage>, timer: TimerToken) {
if self.timer == timer {
let client = self.client.read().unwrap();
client.tick();
self.info.tick(client.deref());
fn timeout(&self, _io: &IoContext<NetSyncMessage>, timer: TimerToken) {
if INFO_TIMER == timer {
self.info.tick(&self.client, &self.sync);
}
}
}
Expand Down
35 changes: 29 additions & 6 deletions src/queue.rs → src/block_queue.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
//! A queue of blocks. Sits between network or other I/O and the BlockChain.
//! Sorts them ready for blockchain insertion.
use std::thread::{JoinHandle, self};
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use util::*;
use verification::*;
use error::*;
use engine::Engine;
use sync::*;
use views::*;
use header::*;
use service::*;

/// Block queue status
#[derive(Debug)]
pub struct BlockQueueInfo {
/// Indicates that queue is full
pub full: bool,
/// Number of queued blocks
pub queue_size: usize,
}

/// A queue of blocks. Sits between network or other I/O and the BlockChain.
/// Sorts them ready for blockchain insertion.
Expand Down Expand Up @@ -63,14 +74,15 @@ impl BlockQueue {
let deleting = Arc::new(AtomicBool::new(false));

let mut verifiers: Vec<JoinHandle<()>> = Vec::new();
let thread_count = max(::num_cpus::get(), 2) - 1;
for _ in 0..thread_count {
let thread_count = max(::num_cpus::get(), 3) - 2;
for i in 0..thread_count {
let verification = verification.clone();
let engine = engine.clone();
let more_to_verify = more_to_verify.clone();
let ready_signal = ready_signal.clone();
let deleting = deleting.clone();
verifiers.push(thread::spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting)));
verifiers.push(thread::Builder::new().name(format!("Verifier #{}", i)).spawn(move || BlockQueue::verify(verification, engine, more_to_verify, ready_signal, deleting))
.expect("Error starting block verification thread"));
}
BlockQueue {
engine: engine,
Expand Down Expand Up @@ -204,7 +216,7 @@ impl BlockQueue {
verification.verified = new_verified;
}

/// TODO [arkpar] Please document me
/// Removes up to `max` verified blocks from the queue
pub fn drain(&mut self, max: usize) -> Vec<PreVerifiedBlock> {
let mut verification = self.verification.lock().unwrap();
let count = min(max, verification.verified.len());
Expand All @@ -215,8 +227,19 @@ impl BlockQueue {
result.push(block);
}
self.ready_signal.reset();
if !verification.verified.is_empty() {
self.ready_signal.set();
}
result
}

/// Get queue status.
pub fn queue_info(&self) -> BlockQueueInfo {
BlockQueueInfo {
full: false,
queue_size: self.verification.lock().unwrap().unverified.len(),
}
}
}

impl Drop for BlockQueue {
Expand All @@ -234,7 +257,7 @@ impl Drop for BlockQueue {
mod tests {
use util::*;
use spec::*;
use queue::*;
use block_queue::*;

#[test]
fn test_block_queue() {
Expand Down
24 changes: 13 additions & 11 deletions src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,19 +342,19 @@ impl BlockChain {
Some(h) => h,
None => return None,
};
Some(self._tree_route((from_details, from), (to_details, to)))
Some(self._tree_route((&from_details, &from), (&to_details, &to)))
}

/// Similar to `tree_route` function, but can be used to return a route
/// between blocks which may not be in database yet.
fn _tree_route(&self, from: (BlockDetails, H256), to: (BlockDetails, H256)) -> TreeRoute {
fn _tree_route(&self, from: (&BlockDetails, &H256), to: (&BlockDetails, &H256)) -> TreeRoute {
let mut from_branch = vec![];
let mut to_branch = vec![];

let mut from_details = from.0;
let mut to_details = to.0;
let mut current_from = from.1;
let mut current_to = to.1;
let mut from_details = from.0.clone();
let mut to_details = to.0.clone();
let mut current_from = from.1.clone();
let mut current_to = to.1.clone();

// reset from && to to the same level
while from_details.number > to_details.number {
Expand Down Expand Up @@ -409,7 +409,7 @@ impl BlockChain {

// store block in db
self.blocks_db.put(&hash, &bytes).unwrap();
let (batch, new_best) = self.block_to_extras_insert_batch(bytes);
let (batch, new_best, details) = self.block_to_extras_insert_batch(bytes);

// update best block
let mut best_block = self.best_block.write().unwrap();
Expand All @@ -420,14 +420,16 @@ impl BlockChain {
// update caches
let mut write = self.block_details.write().unwrap();
write.remove(&header.parent_hash());
write.insert(hash.clone(), details);
self.note_used(CacheID::Block(hash));

// update extras database
self.extras_db.write(batch).unwrap();
}

/// Transforms block into WriteBatch that may be written into database
/// Additionally, if it's new best block it returns new best block object.
fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (WriteBatch, Option<BestBlock>) {
fn block_to_extras_insert_batch(&self, bytes: &[u8]) -> (WriteBatch, Option<BestBlock>, BlockDetails) {
// create views onto rlp
let block = BlockView::new(bytes);
let header = block.header_view();
Expand Down Expand Up @@ -459,15 +461,15 @@ impl BlockChain {

// if it's not new best block, just return
if !is_new_best {
return (batch, None);
return (batch, None, details);
}

// if its new best block we need to make sure that all ancestors
// are moved to "canon chain"
// find the route between old best block and the new one
let best_hash = self.best_block_hash();
let best_details = self.block_details(&best_hash).expect("best block hash is invalid!");
let route = self._tree_route((best_details, best_hash), (details, hash.clone()));
let route = self._tree_route((&best_details, &best_hash), (&details, &hash));

match route.blocks.len() {
// its our parent
Expand All @@ -494,7 +496,7 @@ impl BlockChain {
total_difficulty: total_difficulty
};

(batch, Some(best_block))
(batch, Some(best_block), details)
}

/// Returns true if transaction is known.
Expand Down
Loading