Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Sync stand-alone binary and feature-gated dependencies refactoring #1637

Merged
merged 19 commits into from
Jul 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,18 @@ default = ["ui", "use-precompiled-js"]
ui = ["dapps", "ethcore-signer/ui"]
use-precompiled-js = ["ethcore-dapps/use-precompiled-js", "ethcore-signer/use-precompiled-js"]
dapps = ["ethcore-dapps"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev",
"ethcore-dapps/dev", "ethcore-signer/dev"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"]
travis-beta = ["ethcore/json-tests"]
travis-nightly = ["ethcore/json-tests", "dev"]

[[bin]]
path = "parity/main.rs"
name = "parity"

[[bin]]
path = "parity/sync/main.rs"
name = "sync"

[profile.release]
debug = true
lto = false
14 changes: 14 additions & 0 deletions ethcore/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,18 @@ fn main() {
codegen::register(&mut registry);
registry.expand("", &intermediate, &dst).unwrap();
}

// chain notify interface
{
let src = Path::new("src/client/chain_notify.rs");
let intermediate = Path::new(&out_dir).join("chain_notify.intermediate.rs.in");
let mut registry = syntex::Registry::new();
codegen::register(&mut registry);
registry.expand("", &src, &intermediate).unwrap();

let dst = Path::new(&out_dir).join("chain_notify.ipc.rs");
let mut registry = syntex::Registry::new();
codegen::register(&mut registry);
registry.expand("", &intermediate, &dst).unwrap();
}
}
6 changes: 6 additions & 0 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use util::numbers::*;
use ipc::{IpcConfig, BinaryConvertError};
use std::collections::VecDeque;
use std::mem;

/// Represents what has to be handled by actor listening to chain events
#[derive(Ipc)]
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks
fn new_blocks(&self,
Expand All @@ -38,3 +42,5 @@ pub trait ChainNotify : Send + Sync {
// does nothing by default
}
}

impl IpcConfig for ChainNotify { }
2 changes: 1 addition & 1 deletion ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,4 +1073,4 @@ impl MayPanic for Client {
}
}

impl IpcConfig<BlockChainClient> for Arc<BlockChainClient> { }
impl IpcConfig for BlockChainClient { }
8 changes: 7 additions & 1 deletion ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ mod config;
mod error;
mod test_client;
mod trace;
mod chain_notify;

pub use self::client::*;
pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, Switch, VMType};
Expand Down Expand Up @@ -60,6 +59,13 @@ pub mod client {
include!(concat!(env!("OUT_DIR"), "/client.ipc.rs"));
}

pub mod chain_notify {
//! Chain notify interface

#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/chain_notify.ipc.rs"));
}

/// Blockchain database client. Owns and manages a blockchain and a block queue.
pub trait BlockChainClient : Sync + Send {

Expand Down
9 changes: 4 additions & 5 deletions ipc/codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use syntax::ast::{
MetaItem,
Item,
ImplItemKind,
ImplItem,
MethodSig,
Arg,
PatKind,
Expand Down Expand Up @@ -592,8 +591,8 @@ fn push_client_implementation(
let handshake_item = quote_impl_item!(cx,
pub fn handshake(&self) -> Result<(), ::ipc::Error> {
let payload = ::ipc::Handshake {
protocol_version: ::std::sync::Arc::<$endpoint>::protocol_version(),
api_version: ::std::sync::Arc::<$endpoint>::api_version(),
protocol_version: $endpoint::protocol_version(),
api_version: $endpoint::api_version(),
};

::ipc::invoke(
Expand Down Expand Up @@ -769,7 +768,7 @@ fn ty_ident_map(original_ty: &P<Ty>) -> IdentMap {
ident_map
}

/// implements `IpcInterface<C>` for the given class `C`
/// implements `IpcInterface` for the given class `C`
fn implement_interface(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
Expand Down Expand Up @@ -835,7 +834,7 @@ fn implement_interface(
};

let ipc_item = quote_item!(cx,
impl $host_generics ::ipc::IpcInterface<$interface_endpoint> for ::std::sync::Arc<$interface_endpoint> $where_clause {
impl $host_generics ::ipc::IpcInterface for $interface_endpoint $where_clause {
fn dispatch<R>(&self, r: &mut R) -> Vec<u8>
where R: ::std::io::Read
{
Expand Down
53 changes: 29 additions & 24 deletions ipc/hypervisor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,52 +33,53 @@ use service::{HypervisorService, IpcModuleId};
use std::process::{Command,Child};
use std::collections::HashMap;

pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID};
pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID};

type BinaryId = &'static str;

const CLIENT_BINARY: BinaryId = "client";
pub type BinaryId = &'static str;

pub struct Hypervisor {
ipc_addr: String,
service: Arc<HypervisorService>,
ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>,
processes: RwLock<HashMap<BinaryId, Child>>,
db_path: String,
modules: HashMap<IpcModuleId, (BinaryId, Vec<String>)>,
}

impl Hypervisor {
/// initializes the Hypervisor service with the open ipc socket for incoming clients
pub fn new(db_path: &str) -> Hypervisor {
Hypervisor::with_url(db_path, HYPERVISOR_IPC_URL)
pub fn new() -> Hypervisor {
Hypervisor::with_url(HYPERVISOR_IPC_URL)
}

pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: Vec<String>) -> Hypervisor {
self.modules.insert(module_id, (binary_id, args));
self.service.add_module(module_id);
self
}

/// Starts on the specified address for ipc listener
fn with_url(db_path: &str, addr: &str) -> Hypervisor{
Hypervisor::with_url_and_service(db_path, addr, HypervisorService::new())
pub fn local_module(self, module_id: IpcModuleId) -> Hypervisor {
self.service.add_module(module_id);
self
}

/// Starts with the specified address for the ipc listener and
/// the specified list of modules in form of created service
fn with_url_and_service(db_path: &str, addr: &str, service: Arc<HypervisorService>) -> Hypervisor {
pub fn with_url(addr: &str) -> Hypervisor {
let service = HypervisorService::new();
let worker = nanoipc::Worker::new(&service);
Hypervisor{
ipc_addr: addr.to_owned(),
service: service,
ipc_worker: RwLock::new(worker),
processes: RwLock::new(HashMap::new()),
db_path: db_path.to_owned(),
modules: HashMap::new(),
}
}

/// Since one binary can host multiple modules
/// we match binaries
fn match_module(module_id: &IpcModuleId) -> Option<BinaryId> {
match *module_id {
CLIENT_MODULE_ID => Some(CLIENT_BINARY),
// none means the module is inside the main binary
_ => None
}
fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, Vec<String>)> {
self.modules.get(module_id)
}

/// Creates IPC listener and starts all binaries
Expand All @@ -95,7 +96,7 @@ impl Hypervisor {
/// Does nothing when it is already started on module is inside the
/// main binary
fn start_module(&self, module_id: IpcModuleId) {
Self::match_module(&module_id).map(|binary_id| {
self.match_module(&module_id).map(|&(ref binary_id, ref binary_args)| {
let mut processes = self.processes.write().unwrap();
{
if processes.get(binary_id).is_some() {
Expand All @@ -108,7 +109,12 @@ impl Hypervisor {
executable_path.pop();
executable_path.push(binary_id);

let child = Command::new(&executable_path.to_str().unwrap()).arg(&self.db_path).spawn().unwrap_or_else(
let mut command = Command::new(&executable_path.to_str().unwrap());
for arg in binary_args { command.arg(arg); }

trace!(target: "hypervisor", "Spawn executable: {:?}", command);

let child = command.spawn().unwrap_or_else(
|e| panic!("Hypervisor cannot start binary ({:?}): {}", executable_path, e));
processes.insert(binary_id, child);
});
Expand All @@ -132,7 +138,7 @@ impl Hypervisor {

let mut childs = self.processes.write().unwrap();
for (ref mut binary, ref mut child) in childs.iter_mut() {
trace!(target: "hypervisor", "HYPERVISOR: Stopping process module: {}", binary);
trace!(target: "hypervisor", "Stopping process module: {}", binary);
child.kill().unwrap();
}
}
Expand All @@ -149,15 +155,14 @@ mod tests {
use super::*;
use std::sync::atomic::{AtomicBool,Ordering};
use std::sync::Arc;
use super::service::*;
use nanoipc;

#[test]
fn can_init() {
let url = "ipc:///tmp/test-parity-hypervisor-10.ipc";
let test_module_id = 8080u64;

let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id]));
let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
assert_eq!(false, hypervisor.modules_ready());
}

Expand All @@ -177,7 +182,7 @@ mod tests {
client.module_ready(test_module_id);
});

let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id]));
let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
hypervisor.start();
hypervisor_ready_local.store(true, Ordering::Relaxed);
hypervisor.wait_for_startup();
Expand Down
11 changes: 9 additions & 2 deletions ipc/hypervisor/src/service.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub type IpcModuleId = u64;
/// Blockhain database module id
pub const CLIENT_MODULE_ID: IpcModuleId = 2000;

/// Sync module id
pub const SYNC_MODULE_ID: IpcModuleId = 2100;

/// IPC service that handles module management
pub struct HypervisorService {
check_list: RwLock<HashMap<IpcModuleId, bool>>,
Expand All @@ -43,7 +46,7 @@ impl HypervisorService {
impl HypervisorService {
/// New service with the default list of modules
pub fn new() -> Arc<HypervisorService> {
HypervisorService::with_modules(vec![CLIENT_MODULE_ID])
HypervisorService::with_modules(vec![])
}

/// New service with list of modules that will report for being ready
Expand All @@ -57,6 +60,10 @@ impl HypervisorService {
})
}

pub fn add_module(&self, module_id: IpcModuleId) {
self.check_list.write().unwrap().insert(module_id, false);
}

/// Number of modules still being waited for check-in
pub fn unchecked_count(&self) -> usize {
self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count()
Expand All @@ -68,4 +75,4 @@ impl HypervisorService {
}
}

impl ::ipc::IpcConfig<HypervisorService> for Arc<HypervisorService> {}
impl ::ipc::IpcConfig for HypervisorService {}
4 changes: 2 additions & 2 deletions ipc/nano/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const POLL_TIMEOUT: isize = 100;
const CLIENT_CONNECTION_TIMEOUT: isize = 2500;

/// Generic worker to handle service (binded) sockets
pub struct Worker<S: ?Sized> where Arc<S>: IpcInterface<S> {
pub struct Worker<S: ?Sized> where S: IpcInterface {
service: Arc<S>,
sockets: Vec<(Socket, Endpoint)>,
polls: Vec<PollFd>,
Expand Down Expand Up @@ -116,7 +116,7 @@ pub enum SocketError {
RequestLink,
}

impl<S: ?Sized> Worker<S> where Arc<S>: IpcInterface<S> {
impl<S: ?Sized> Worker<S> where S: IpcInterface {
/// New worker over specified `service`
pub fn new(service: &Arc<S>) -> Worker<S> {
Worker::<S> {
Expand Down
4 changes: 2 additions & 2 deletions ipc/rpc/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct Handshake {

/// Allows to configure custom version and custom handshake response for
/// ipc host
pub trait IpcConfig<I: ?Sized> {
pub trait IpcConfig {
/// Current service api version
/// Should be increased if any of the methods changes signature
fn api_version() -> Version {
Expand Down Expand Up @@ -60,7 +60,7 @@ pub enum Error {

/// Allows implementor to be attached to generic worker and dispatch rpc requests
/// over IPC
pub trait IpcInterface<I :?Sized> : IpcConfig<I> {
pub trait IpcInterface : IpcConfig {
/// reads the message from io, dispatches the call and returns serialized result
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;

Expand Down
7 changes: 4 additions & 3 deletions parity/io_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::Arc;
use ethcore::client::Client;
use ethcore::service::ClientIoMessage;
use ethsync::{EthSync, SyncProvider, ManageNetwork};
use ethsync::{SyncProvider, ManageNetwork};
use ethcore::account_provider::AccountProvider;
use util::{TimerToken, IoHandler, IoContext};

Expand All @@ -27,7 +27,8 @@ const INFO_TIMER: TimerToken = 0;

pub struct ClientIoHandler {
pub client: Arc<Client>,
pub sync: Arc<EthSync>,
pub sync: Arc<SyncProvider>,
pub net: Arc<ManageNetwork>,
pub accounts: Arc<AccountProvider>,
pub info: Informant,
}
Expand All @@ -40,7 +41,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if let INFO_TIMER = timer {
let sync_status = self.sync.status();
let network_config = self.sync.network_config();
let network_config = self.net.network_config();
self.info.tick(&self.client, Some((sync_status, network_config)));
}
}
Expand Down
Loading