Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1637 from ethcore/sync-svc
Browse files Browse the repository at this point in the history
Sync stand-alone binary and feature-gated dependencies refactoring
  • Loading branch information
NikVolf authored Jul 18, 2016
2 parents 3d00a91 + 028d6f6 commit 18f1661
Show file tree
Hide file tree
Showing 18 changed files with 288 additions and 67 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,18 @@ default = ["ui", "use-precompiled-js"]
ui = ["dapps", "ethcore-signer/ui"]
use-precompiled-js = ["ethcore-dapps/use-precompiled-js", "ethcore-signer/use-precompiled-js"]
dapps = ["ethcore-dapps"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev",
"ethcore-dapps/dev", "ethcore-signer/dev"]
dev = ["clippy", "ethcore/dev", "ethcore-util/dev", "ethsync/dev", "ethcore-rpc/dev", "ethcore-dapps/dev", "ethcore-signer/dev"]
travis-beta = ["ethcore/json-tests"]
travis-nightly = ["ethcore/json-tests", "dev"]

[[bin]]
path = "parity/main.rs"
name = "parity"

[[bin]]
path = "parity/sync/main.rs"
name = "sync"

[profile.release]
debug = true
lto = false
14 changes: 14 additions & 0 deletions ethcore/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,18 @@ fn main() {
codegen::register(&mut registry);
registry.expand("", &intermediate, &dst).unwrap();
}

// chain notify interface
{
let src = Path::new("src/client/chain_notify.rs");
let intermediate = Path::new(&out_dir).join("chain_notify.intermediate.rs.in");
let mut registry = syntex::Registry::new();
codegen::register(&mut registry);
registry.expand("", &src, &intermediate).unwrap();

let dst = Path::new(&out_dir).join("chain_notify.ipc.rs");
let mut registry = syntex::Registry::new();
codegen::register(&mut registry);
registry.expand("", &intermediate, &dst).unwrap();
}
}
6 changes: 6 additions & 0 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use util::numbers::*;
use ipc::{IpcConfig, BinaryConvertError};
use std::collections::VecDeque;
use std::mem;

/// Represents what has to be handled by actor listening to chain events
#[derive(Ipc)]
pub trait ChainNotify : Send + Sync {
/// fires when chain has new blocks
fn new_blocks(&self,
Expand All @@ -38,3 +42,5 @@ pub trait ChainNotify : Send + Sync {
// does nothing by default
}
}

impl IpcConfig for ChainNotify { }
2 changes: 1 addition & 1 deletion ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,4 +1073,4 @@ impl MayPanic for Client {
}
}

impl IpcConfig<BlockChainClient> for Arc<BlockChainClient> { }
impl IpcConfig for BlockChainClient { }
8 changes: 7 additions & 1 deletion ethcore/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ mod config;
mod error;
mod test_client;
mod trace;
mod chain_notify;

pub use self::client::*;
pub use self::config::{Mode, ClientConfig, DatabaseCompactionProfile, BlockQueueConfig, BlockChainConfig, Switch, VMType};
Expand Down Expand Up @@ -60,6 +59,13 @@ pub mod client {
include!(concat!(env!("OUT_DIR"), "/client.ipc.rs"));
}

pub mod chain_notify {
//! Chain notify interface
#![allow(dead_code, unused_assignments, unused_variables, missing_docs)] // codegen issues
include!(concat!(env!("OUT_DIR"), "/chain_notify.ipc.rs"));
}

/// Blockchain database client. Owns and manages a blockchain and a block queue.
pub trait BlockChainClient : Sync + Send {

Expand Down
9 changes: 4 additions & 5 deletions ipc/codegen/src/codegen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use syntax::ast::{
MetaItem,
Item,
ImplItemKind,
ImplItem,
MethodSig,
Arg,
PatKind,
Expand Down Expand Up @@ -592,8 +591,8 @@ fn push_client_implementation(
let handshake_item = quote_impl_item!(cx,
pub fn handshake(&self) -> Result<(), ::ipc::Error> {
let payload = ::ipc::Handshake {
protocol_version: ::std::sync::Arc::<$endpoint>::protocol_version(),
api_version: ::std::sync::Arc::<$endpoint>::api_version(),
protocol_version: $endpoint::protocol_version(),
api_version: $endpoint::api_version(),
};

::ipc::invoke(
Expand Down Expand Up @@ -769,7 +768,7 @@ fn ty_ident_map(original_ty: &P<Ty>) -> IdentMap {
ident_map
}

/// implements `IpcInterface<C>` for the given class `C`
/// implements `IpcInterface` for the given class `C`
fn implement_interface(
cx: &ExtCtxt,
builder: &aster::AstBuilder,
Expand Down Expand Up @@ -835,7 +834,7 @@ fn implement_interface(
};

let ipc_item = quote_item!(cx,
impl $host_generics ::ipc::IpcInterface<$interface_endpoint> for ::std::sync::Arc<$interface_endpoint> $where_clause {
impl $host_generics ::ipc::IpcInterface for $interface_endpoint $where_clause {
fn dispatch<R>(&self, r: &mut R) -> Vec<u8>
where R: ::std::io::Read
{
Expand Down
53 changes: 29 additions & 24 deletions ipc/hypervisor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,52 +33,53 @@ use service::{HypervisorService, IpcModuleId};
use std::process::{Command,Child};
use std::collections::HashMap;

pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID};
pub use service::{HypervisorServiceClient, CLIENT_MODULE_ID, SYNC_MODULE_ID};

type BinaryId = &'static str;

const CLIENT_BINARY: BinaryId = "client";
pub type BinaryId = &'static str;

pub struct Hypervisor {
ipc_addr: String,
service: Arc<HypervisorService>,
ipc_worker: RwLock<nanoipc::Worker<HypervisorService>>,
processes: RwLock<HashMap<BinaryId, Child>>,
db_path: String,
modules: HashMap<IpcModuleId, (BinaryId, Vec<String>)>,
}

impl Hypervisor {
/// initializes the Hypervisor service with the open ipc socket for incoming clients
pub fn new(db_path: &str) -> Hypervisor {
Hypervisor::with_url(db_path, HYPERVISOR_IPC_URL)
pub fn new() -> Hypervisor {
Hypervisor::with_url(HYPERVISOR_IPC_URL)
}

pub fn module(mut self, module_id: IpcModuleId, binary_id: BinaryId, args: Vec<String>) -> Hypervisor {
self.modules.insert(module_id, (binary_id, args));
self.service.add_module(module_id);
self
}

/// Starts on the specified address for ipc listener
fn with_url(db_path: &str, addr: &str) -> Hypervisor{
Hypervisor::with_url_and_service(db_path, addr, HypervisorService::new())
pub fn local_module(self, module_id: IpcModuleId) -> Hypervisor {
self.service.add_module(module_id);
self
}

/// Starts with the specified address for the ipc listener and
/// the specified list of modules in form of created service
fn with_url_and_service(db_path: &str, addr: &str, service: Arc<HypervisorService>) -> Hypervisor {
pub fn with_url(addr: &str) -> Hypervisor {
let service = HypervisorService::new();
let worker = nanoipc::Worker::new(&service);
Hypervisor{
ipc_addr: addr.to_owned(),
service: service,
ipc_worker: RwLock::new(worker),
processes: RwLock::new(HashMap::new()),
db_path: db_path.to_owned(),
modules: HashMap::new(),
}
}

/// Since one binary can host multiple modules
/// we match binaries
fn match_module(module_id: &IpcModuleId) -> Option<BinaryId> {
match *module_id {
CLIENT_MODULE_ID => Some(CLIENT_BINARY),
// none means the module is inside the main binary
_ => None
}
fn match_module(&self, module_id: &IpcModuleId) -> Option<&(BinaryId, Vec<String>)> {
self.modules.get(module_id)
}

/// Creates IPC listener and starts all binaries
Expand All @@ -95,7 +96,7 @@ impl Hypervisor {
/// Does nothing when it is already started on module is inside the
/// main binary
fn start_module(&self, module_id: IpcModuleId) {
Self::match_module(&module_id).map(|binary_id| {
self.match_module(&module_id).map(|&(ref binary_id, ref binary_args)| {
let mut processes = self.processes.write().unwrap();
{
if processes.get(binary_id).is_some() {
Expand All @@ -108,7 +109,12 @@ impl Hypervisor {
executable_path.pop();
executable_path.push(binary_id);

let child = Command::new(&executable_path.to_str().unwrap()).arg(&self.db_path).spawn().unwrap_or_else(
let mut command = Command::new(&executable_path.to_str().unwrap());
for arg in binary_args { command.arg(arg); }

trace!(target: "hypervisor", "Spawn executable: {:?}", command);

let child = command.spawn().unwrap_or_else(
|e| panic!("Hypervisor cannot start binary ({:?}): {}", executable_path, e));
processes.insert(binary_id, child);
});
Expand All @@ -132,7 +138,7 @@ impl Hypervisor {

let mut childs = self.processes.write().unwrap();
for (ref mut binary, ref mut child) in childs.iter_mut() {
trace!(target: "hypervisor", "HYPERVISOR: Stopping process module: {}", binary);
trace!(target: "hypervisor", "Stopping process module: {}", binary);
child.kill().unwrap();
}
}
Expand All @@ -149,15 +155,14 @@ mod tests {
use super::*;
use std::sync::atomic::{AtomicBool,Ordering};
use std::sync::Arc;
use super::service::*;
use nanoipc;

#[test]
fn can_init() {
let url = "ipc:///tmp/test-parity-hypervisor-10.ipc";
let test_module_id = 8080u64;

let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id]));
let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
assert_eq!(false, hypervisor.modules_ready());
}

Expand All @@ -177,7 +182,7 @@ mod tests {
client.module_ready(test_module_id);
});

let hypervisor = Hypervisor::with_url_and_service("", url, HypervisorService::with_modules(vec![test_module_id]));
let hypervisor = Hypervisor::with_url(url).local_module(test_module_id);
hypervisor.start();
hypervisor_ready_local.store(true, Ordering::Relaxed);
hypervisor.wait_for_startup();
Expand Down
11 changes: 9 additions & 2 deletions ipc/hypervisor/src/service.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub type IpcModuleId = u64;
/// Blockhain database module id
pub const CLIENT_MODULE_ID: IpcModuleId = 2000;

/// Sync module id
pub const SYNC_MODULE_ID: IpcModuleId = 2100;

/// IPC service that handles module management
pub struct HypervisorService {
check_list: RwLock<HashMap<IpcModuleId, bool>>,
Expand All @@ -43,7 +46,7 @@ impl HypervisorService {
impl HypervisorService {
/// New service with the default list of modules
pub fn new() -> Arc<HypervisorService> {
HypervisorService::with_modules(vec![CLIENT_MODULE_ID])
HypervisorService::with_modules(vec![])
}

/// New service with list of modules that will report for being ready
Expand All @@ -57,6 +60,10 @@ impl HypervisorService {
})
}

pub fn add_module(&self, module_id: IpcModuleId) {
self.check_list.write().unwrap().insert(module_id, false);
}

/// Number of modules still being waited for check-in
pub fn unchecked_count(&self) -> usize {
self.check_list.read().unwrap().iter().filter(|&(_, status)| !status).count()
Expand All @@ -68,4 +75,4 @@ impl HypervisorService {
}
}

impl ::ipc::IpcConfig<HypervisorService> for Arc<HypervisorService> {}
impl ::ipc::IpcConfig for HypervisorService {}
4 changes: 2 additions & 2 deletions ipc/nano/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const POLL_TIMEOUT: isize = 100;
const CLIENT_CONNECTION_TIMEOUT: isize = 2500;

/// Generic worker to handle service (binded) sockets
pub struct Worker<S: ?Sized> where Arc<S>: IpcInterface<S> {
pub struct Worker<S: ?Sized> where S: IpcInterface {
service: Arc<S>,
sockets: Vec<(Socket, Endpoint)>,
polls: Vec<PollFd>,
Expand Down Expand Up @@ -116,7 +116,7 @@ pub enum SocketError {
RequestLink,
}

impl<S: ?Sized> Worker<S> where Arc<S>: IpcInterface<S> {
impl<S: ?Sized> Worker<S> where S: IpcInterface {
/// New worker over specified `service`
pub fn new(service: &Arc<S>) -> Worker<S> {
Worker::<S> {
Expand Down
4 changes: 2 additions & 2 deletions ipc/rpc/src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct Handshake {

/// Allows to configure custom version and custom handshake response for
/// ipc host
pub trait IpcConfig<I: ?Sized> {
pub trait IpcConfig {
/// Current service api version
/// Should be increased if any of the methods changes signature
fn api_version() -> Version {
Expand Down Expand Up @@ -60,7 +60,7 @@ pub enum Error {

/// Allows implementor to be attached to generic worker and dispatch rpc requests
/// over IPC
pub trait IpcInterface<I :?Sized> : IpcConfig<I> {
pub trait IpcInterface : IpcConfig {
/// reads the message from io, dispatches the call and returns serialized result
fn dispatch<R>(&self, r: &mut R) -> Vec<u8> where R: Read;

Expand Down
7 changes: 4 additions & 3 deletions parity/io_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use std::sync::Arc;
use ethcore::client::Client;
use ethcore::service::ClientIoMessage;
use ethsync::{EthSync, SyncProvider, ManageNetwork};
use ethsync::{SyncProvider, ManageNetwork};
use ethcore::account_provider::AccountProvider;
use util::{TimerToken, IoHandler, IoContext};

Expand All @@ -27,7 +27,8 @@ const INFO_TIMER: TimerToken = 0;

pub struct ClientIoHandler {
pub client: Arc<Client>,
pub sync: Arc<EthSync>,
pub sync: Arc<SyncProvider>,
pub net: Arc<ManageNetwork>,
pub accounts: Arc<AccountProvider>,
pub info: Informant,
}
Expand All @@ -40,7 +41,7 @@ impl IoHandler<ClientIoMessage> for ClientIoHandler {
fn timeout(&self, _io: &IoContext<ClientIoMessage>, timer: TimerToken) {
if let INFO_TIMER = timer {
let sync_status = self.sync.status();
let network_config = self.sync.network_config();
let network_config = self.net.network_config();
self.info.tick(&self.client, Some((sync_status, network_config)));
}
}
Expand Down
Loading

0 comments on commit 18f1661

Please sign in to comment.