Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Sync stand-alone binary and feature-gated dependencies refactoring #1637

Merged
merged 19 commits into from
Jul 18, 2016
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ default-features = false
[features]
default = ["dapps", "ethcore-signer/ui"]
dapps = ["ethcore-dapps"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev",
"ethcore-dapps/dev", "ethcore-signer/dev"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"]
travis-beta = ["ethcore/json-tests"]
travis-nightly = ["ethcore/json-tests", "dev"]

[[bin]]
path = "parity/main.rs"
name = "parity"

[[bin]]
path = "parity/sync/main.rs"
name = "sync"

[profile.release]
debug = true
lto = false
14 changes: 14 additions & 0 deletions ethcore/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,18 @@ fn main() {
codegen::register(&mut registry);
registry.expand("", &intermediate, &dst).unwrap();
}

// chain notify interface
{
let src = Path::new("src/client/chain_notify.rs");
let intermediate = Path::new(&out_dir).join("chain_notify.intermediate.rs.in");
let mut registry = syntex::Registry::new();
codegen::register(&mut registry);
registry.expand("", &src, &intermediate).unwrap();

let dst = Path::new(&out_dir).join("chain_notify.ipc.rs");
let mut registry = syntex::Registry::new();
codegen::register(&mut registry);
registry.expand("", &intermediate, &dst).unwrap();
}
}
7 changes: 7 additions & 0 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use util::numbers::*;
use ipc::{IpcConfig, BinaryConvertError};
use std::sync::Arc;
use std::collections::VecDeque;
use std::mem;

/// Represents what has to be handled by actor listening to chain events
#[derive(Ipc)]
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks
fn new_blocks(&self,
Expand All @@ -38,3 +43,5 @@ pub trait ChainNotify : Send + Sync {
// does nothing by default
}
}

impl IpcConfig<ChainNotify> for Arc<ChainNotify> { }
8 changes: 7 additions & 1 deletion ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ mod config;
mod error;
mod test_client;
mod trace;
mod chain_notify;

pub use self::client::*;
pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, Switch, VMType};
Expand Down Expand Up @@ -60,6 +59,13 @@ pub mod client {
include!(concat!(env!("OUT_DIR"), "/client.ipc.rs"));
}

pub mod chain_notify {
//! Chain notify interface

#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/chain_notify.ipc.rs"));
}

/// Blockchain database client. Owns and manages a blockchain and a block queue.
pub trait BlockChainClient : Sync + Send {

Expand Down
53 changes: 29 additions & 24 deletions ipc/hypervisor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,52 +33,53 @@ use service::{HypervisorService, IpcModuleId};
use std::process::{Command,Child};
use std::collections::HashMap;

pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID};
pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID};

type BinaryId = &'static str;

const CLIENT_BINARY: BinaryId = "client";
pub type BinaryId = &'static str;

pub struct Hypervisor {
ipc_addr: String,
service: Arc<HypervisorService>,
ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>,
processes: RwLock<HashMap<BinaryId, Child>>,
db_path: String,
modules: HashMap<IpcModuleId, (BinaryId, Vec<String>)>,
}

impl Hypervisor {
/// initializes the Hypervisor service with the open ipc socket for incoming clients
pub fn new(db_path: &str) -> Hypervisor {
Hypervisor::with_url(db_path, HYPERVISOR_IPC_URL)
pub fn new() -> Hypervisor {
Hypervisor::with_url(HYPERVISOR_IPC_URL)
}

pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: Vec<String>) -> Hypervisor {
self.modules.insert(module_id, (binary_id, args));
self.service.add_module(module_id);
self
}

/// Starts on the specified address for ipc listener
fn with_url(db_path: &str, addr: &str) -> Hypervisor{
Hypervisor::with_url_and_service(db_path, addr, HypervisorService::new())
pub fn local_module(self, module_id: IpcModuleId) -> Hypervisor {
self.service.add_module(module_id);
self
}

/// Starts with the specified address for the ipc listener and
/// the specified list of modules in form of created service
fn with_url_and_service(db_path: &str, addr: &str, service: Arc<HypervisorService>) -> Hypervisor {
pub fn with_url(addr: &str) -> Hypervisor {
let service = HypervisorService::new();
let worker = nanoipc::Worker::new(&service);
Hypervisor{
ipc_addr: addr.to_owned(),
service: service,
ipc_worker: RwLock::new(worker),
processes: RwLock::new(HashMap::new()),
db_path: db_path.to_owned(),
modules: HashMap::new(),
}
}

/// Since one binary can host multiple modules
/// we match binaries
fn match_module(module_id: &IpcModuleId) -> Option<BinaryId> {
match *module_id {
CLIENT_MODULE_ID => Some(CLIENT_BINARY),
// none means the module is inside the main binary
_ => None
}
fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, Vec<String>)> {
self.modules.get(module_id)
}

/// Creates IPC listener and starts all binaries
Expand All @@ -95,7 +96,7 @@ impl Hypervisor {
/// Does nothing when it is already started on module is inside the
/// main binary
fn start_module(&self, module_id: IpcModuleId) {
Self::match_module(&module_id).map(|binary_id| {
self.match_module(&module_id).map(|&(ref binary_id, ref binary_args)| {
let mut processes = self.processes.write().unwrap();
{
if processes.get(binary_id).is_some() {
Expand All @@ -108,7 +109,12 @@ impl Hypervisor {
executable_path.pop();
executable_path.push(binary_id);

let child = Command::new(&executable_path.to_str().unwrap()).arg(&self.db_path).spawn().unwrap_or_else(
let mut command = Command::new(&executable_path.to_str().unwrap());
for arg in binary_args { command.arg(arg); }

trace!(target: "hypervisor", "Spawn executable: {:?}", command);

let child = command.spawn().unwrap_or_else(
|e| panic!("Hypervisor cannot start binary ({:?}): {}", executable_path, e));
processes.insert(binary_id, child);
});
Expand All @@ -132,7 +138,7 @@ impl Hypervisor {

let mut childs = self.processes.write().unwrap();
for (ref mut binary, ref mut child) in childs.iter_mut() {
trace!(target: "hypervisor", "HYPERVISOR: Stopping process module: {}", binary);
trace!(target: "hypervisor", "Stopping process module: {}", binary);
child.kill().unwrap();
}
}
Expand All @@ -149,15 +155,14 @@ mod tests {
use super::*;
use std::sync::atomic::{AtomicBool,Ordering};
use std::sync::Arc;
use super::service::*;
use nanoipc;

#[test]
fn can_init() {
let url = "ipc:///tmp/test-parity-hypervisor-10.ipc";
let test_module_id = 8080u64;

let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id]));
let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
assert_eq!(false, hypervisor.modules_ready());
}

Expand All @@ -177,7 +182,7 @@ mod tests {
client.module_ready(test_module_id);
});

let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id]));
let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
hypervisor.start();
hypervisor_ready_local.store(true, Ordering::Relaxed);
hypervisor.wait_for_startup();
Expand Down
9 changes: 8 additions & 1 deletion ipc/hypervisor/src/service.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub type IpcModuleId = u64;
/// Blockhain database module id
pub const CLIENT_MODULE_ID: IpcModuleId = 2000;

/// Sync module id
pub const SYNC_MODULE_ID: IpcModuleId = 2100;

/// IPC service that handles module management
pub struct HypervisorService {
check_list: RwLock<HashMap<IpcModuleId, bool>>,
Expand All @@ -43,7 +46,7 @@ impl HypervisorService {
impl HypervisorService {
/// New service with the default list of modules
pub fn new() -> Arc<HypervisorService> {
HypervisorService::with_modules(vec![CLIENT_MODULE_ID])
HypervisorService::with_modules(vec![])
}

/// New service with list of modules that will report for being ready
Expand All @@ -57,6 +60,10 @@ impl HypervisorService {
})
}

pub fn add_module(&self, module_id: IpcModuleId) {
self.check_list.write().unwrap().insert(module_id, false);
}

/// Number of modules still being waited for check-in
pub fn unchecked_count(&self) -> usize {
self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count()
Expand Down
4 changes: 2 additions & 2 deletions ipc/nano/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ impl<S> GuardedSocket<S> where S: WithSocket<Socket> {
}

impl<S> Deref for GuardedSocket<S> where S: WithSocket<Socket> {
type Target = S;
type Target = Arc<S>;

fn deref(&self) -> &S {
fn deref(&self) -> &Arc<S> {
Copy link
Contributor

@rphmeier rphmeier Jul 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am becoming wary of anything that uses &Arc<T> since it makes it too easy to perform unnecessary atomic operations. this kind of deref impl will also hurt readability because basically the only thing you can do with a &Arc<T> over a &T is call clone. But then it will look like you've cloned the GuardedSocket...and gotten back an Arc<S>?

why not have an explicit clone_client method?

edit: we do! it's called service

Copy link
Contributor Author

@NikVolf NikVolf Jul 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rphmeier because actual dispatch is implemented for Arc of T, but not for T

see here:
https://github.com/ethcore/parity/blob/master/ipc/codegen/src/codegen.rs#L838

impl $host_generics ::ipc::IpcInterface<$interface_endpoint> for ::std::sync::Arc<$interface_endpoint> $where_clause {
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the implementation do further cloning of self and sharing it between threads? if it doesn't then the impl should just be for T. that is strictly more flexible.

even so, that doesn't require this deref impl

Copy link
Contributor Author

@NikVolf NikVolf Jul 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because $interface_endpoint can be trait

Copy link
Contributor Author

@NikVolf NikVolf Jul 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the same time, say, EthSync can host multiple endpoints and you end up with confilcting implementations of IpcInterface for EthSync

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can implement the trait for the unsized trait object type as shown here: https://is.gd/1YaeHw

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably won't work when all this interfaces are in different crates
but don't know how to check that

&self.client
}
}
Expand Down
7 changes: 4 additions & 3 deletions parity/io_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::Arc;
use ethcore::client::Client;
use ethcore::service::ClientIoMessage;
use ethsync::{EthSync, SyncProvider, ManageNetwork};
use ethsync::{SyncProvider, ManageNetwork};
use ethcore::account_provider::AccountProvider;
use util::{TimerToken, IoHandler, IoContext};

Expand All @@ -27,7 +27,8 @@ const INFO_TIMER: TimerToken = 0;

pub struct ClientIoHandler {
pub client: Arc<Client>,
pub sync: Arc<EthSync>,
pub sync: Arc<SyncProvider>,
pub net: Arc<ManageNetwork>,
pub accounts: Arc<AccountProvider>,
pub info: Informant,
}
Expand All @@ -40,7 +41,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if let INFO_TIMER = timer {
let sync_status = self.sync.status();
let network_config = self.sync.network_config();
let network_config = self.net.network_config();
self.info.tick(&self.client, Some((sync_status, network_config)));
}
}
Expand Down
26 changes: 16 additions & 10 deletions parity/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod migration;
mod signer;
mod rpc_apis;
mod url;
mod modules;

use std::io::{Write, Read, BufReader, BufRead};
use std::ops::Deref;
Expand All @@ -85,12 +86,11 @@ use rustc_serialize::hex::FromHex;
use ctrlc::CtrlC;
use util::{H256, ToPretty, PayloadInfo, Bytes, Colour, version, journaldb};
use util::panics::{MayPanic, ForwardPanic, PanicHandler};
use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError,
ChainNotify, Mode};
use ethcore::client::{BlockID, BlockChainClient, ClientConfig, get_db_path, BlockImportError, Mode};
use ethcore::error::{ImportError};
use ethcore::service::ClientService;
use ethcore::spec::Spec;
use ethsync::{EthSync, NetworkConfiguration};
use ethsync::{NetworkConfiguration};
use ethcore::miner::{Miner, MinerService, ExternalMiner};
use migration::migrate;
use informant::Informant;
Expand Down Expand Up @@ -244,27 +244,32 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
let network_settings = Arc::new(conf.network_settings());

// Sync
let sync = EthSync::new(sync_config, client.clone(), NetworkConfiguration::from(net_settings))
.unwrap_or_else(|e| die_with_error("Sync", ethcore::error::Error::Util(e)));
service.set_notify(&(sync.clone() as Arc<ChainNotify>));
let (sync_provider, manage_network, chain_notify) =
modules::sync(sync_config, NetworkConfiguration::from(net_settings), client.clone())
.unwrap_or_else(|e| die_with_error("Sync", e));
//
// let sync = EthSync::new(sync_config, client.clone(), NetworkConfiguration::from(net_settings))
// .unwrap_or_else(|e| die_with_error("Sync", ethcore::error::Error::Util(e)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented out code

service.set_notify(&chain_notify);

// if network is active by default
if match conf.mode() { Mode::Dark(..) => false, _ => !conf.args.flag_no_network } {
sync.start();
chain_notify.start();
}

let deps_for_rpc_apis = Arc::new(rpc_apis::Dependencies {
signer_port: conf.signer_port(),
signer_queue: Arc::new(rpc_apis::ConfirmationsQueue::default()),
client: client.clone(),
sync: sync.clone(),
sync: sync_provider.clone(),
net: manage_network.clone(),
secret_store: account_service.clone(),
miner: miner.clone(),
external_miner: external_miner.clone(),
logger: logger.clone(),
settings: network_settings.clone(),
allow_pending_receipt_query: !conf.args.flag_geth,
net_service: sync.clone(),
net_service: manage_network.clone(),
});

let dependencies = rpc::Dependencies {
Expand Down Expand Up @@ -312,7 +317,8 @@ fn execute_client(conf: Configuration, spec: Spec, client_config: ClientConfig)
let io_handler = Arc::new(ClientIoHandler {
client: service.client(),
info: Informant::new(conf.have_color()),
sync: sync.clone(),
sync: sync_provider.clone(),
net: manage_network.clone(),
accounts: account_service.clone(),
});
service.register_io_handler(io_handler).expect("Error registering IO handler");
Expand Down
40 changes: 40 additions & 0 deletions parity/modules.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2015, 2016 Ethcore (UK) Ltd.
// This file is part of Parity.

// Parity is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use ethsync::{EthSync, SyncProvider, ManageNetwork, SyncConfig, NetworkConfiguration};
use std::sync::Arc;
use ethcore::client::{ChainNotify, BlockChainClient};
use ethcore;

#[cfg(feature="ipc")]
pub fn sync(
sync_cfg: SyncConfig,
net_cfg: NetworkConfiguration,
client: Arc<BlockChainClient>)
-> Result<(Arc<SyncProvider>, Arc<ManageNetwork>, Arc<ChainNotify>), ethcore::error::Error>
{
}

#[cfg(not(feature="ipc"))]
pub fn sync(
sync_cfg: SyncConfig,
net_cfg: NetworkConfiguration,
client: Arc<BlockChainClient>)
-> Result<(Arc<SyncProvider>, Arc<ManageNetwork>, Arc<ChainNotify>), ethcore::error::Error>
{
let eth_sync = try!(EthSync::new(sync_cfg, client, net_cfg).map_err(|e| ethcore::error::Error::Util(e)));
Ok((eth_sync.clone() as Arc<SyncProvider>, eth_sync.clone() as Arc<ManageNetwork>, eth_sync.clone() as Arc<ChainNotify>))
}
Loading