Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Allow dropping light client RPC query with no results #9318

Merged
merged 18 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 124 additions & 21 deletions ethcore/light/src/on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
//! will take the raw data received here and extract meaningful results from it.

use std::cmp;
use std::collections::HashMap;
use std::collections::{HashMap, BTreeSet};
use std::marker::PhantomData;
use std::sync::Arc;

use ethcore::executed::{Executed, ExecutionError};

use futures::{Poll, Future};
use futures::{Poll, Future, Async};
use futures::sync::oneshot::{self, Receiver, Canceled};
use network::PeerId;
use parking_lot::{RwLock, Mutex};
use rand;
use std::time::{Duration, SystemTime};

use net::{
self, Handler, PeerStatus, Status, Capabilities,
Expand All @@ -49,7 +50,14 @@ pub mod request;
/// The result of execution
pub type ExecutionResult = Result<Executed, ExecutionError>;

/// The default number of retry for OnDemand queries send to other nodes
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default number of retries for OnDemand queries to send to the other nodes

pub const DEFAULT_NB_RETRY: usize = 10;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd name this DEFAULT_RETRY_COUNT to be consistent.


/// The default time limit in milliseconds for inactive (no new peer to connect to) OnDemand queries (0 for unlimited)
pub const DEFAULT_QUERY_TIME_LIMIT: u64 = 10000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use const Duration here instead!


// relevant peer info.
#[derive(Debug, Clone, PartialEq, Eq)]
struct Peer {
status: Status,
capabilities: Capabilities,
Expand All @@ -69,8 +77,8 @@ impl Peer {
};

local_caps.serve_headers >= request.serve_headers &&
can_serve_since(request.serve_chain_since, local_caps.serve_chain_since) &&
can_serve_since(request.serve_state_since, local_caps.serve_state_since)
can_serve_since(request.serve_chain_since, local_caps.serve_chain_since) &&
can_serve_since(request.serve_state_since, local_caps.serve_state_since)
}
}

Expand All @@ -81,6 +89,10 @@ struct Pending {
required_capabilities: Capabilities,
responses: Vec<Response>,
sender: oneshot::Sender<Vec<Response>>,
base_query_index: usize,
remaining_query_count: usize,
query_id_history: BTreeSet<PeerId>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to use BTreeSet instead of HashSet? We don't rely on ids order as it seems, so HashSet should more appropriate maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect it to be a small set, and I generally favor a btreeset in those case.

inactive_time_limit: Option<SystemTime>,
}

impl Pending {
Expand Down Expand Up @@ -145,7 +157,9 @@ impl Pending {
// if the requests are complete, send the result and consume self.
fn try_complete(self) -> Option<Self> {
if self.requests.is_complete() {
let _ = self.sender.send(self.responses);
if self.sender.send(self.responses).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on complet request");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete?

}
None
} else {
Some(self)
Expand Down Expand Up @@ -180,6 +194,15 @@ impl Pending {
self.net_requests = builder.build();
self.required_capabilities = capabilities;
}

// returning no reponse, it will result in an error.
// self is consumed on purpose.
fn no_response(self) {
trace!(target: "on_demand", "Dropping a pending query (no reply)");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query index here if possible?

if self.sender.send(Vec::with_capacity(0)).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on no response");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "…on time out"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oups, totally.

}
}
}

// helper to guess capabilities required for a given batch of network requests.
Expand Down Expand Up @@ -240,7 +263,16 @@ impl<T: request::RequestAdapter> Future for OnResponses<T> {
type Error = Canceled;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.receiver.poll().map(|async| async.map(T::extract_from))
match self.receiver.poll() {
Ok(Async::Ready(v)) => {
if v.is_empty() {
return Err(Canceled);
}
Ok(Async::Ready(T::extract_from(v)))
},
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e),
}
}
}

Expand All @@ -254,9 +286,12 @@ pub struct OnDemand {
in_transit: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>,
no_immediate_dispatch: bool,
base_retry_number: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps base_retry_count for consistency?

query_inactive_time_limit: Option<Duration>,
}

impl OnDemand {

/// Create a new `OnDemand` service with the given cache.
pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
OnDemand {
Expand All @@ -265,6 +300,8 @@ impl OnDemand {
in_transit: RwLock::new(HashMap::new()),
cache: cache,
no_immediate_dispatch: false,
base_retry_number: DEFAULT_NB_RETRY,
query_inactive_time_limit: Some(Duration::from_millis(DEFAULT_QUERY_TIME_LIMIT)),
}
}

Expand Down Expand Up @@ -326,6 +363,10 @@ impl OnDemand {
required_capabilities: capabilities,
responses: responses,
sender: sender,
base_query_index: 0,
remaining_query_count: 0,
query_id_history: BTreeSet::new(),
inactive_time_limit: None,
});

Ok(receiver)
Expand Down Expand Up @@ -364,30 +405,67 @@ impl OnDemand {
let peers = self.peers.read();
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
.filter(|pending| !pending.sender.is_canceled())
.filter_map(|pending| {
.filter_map(|mut pending| {
// the peer we dispatch to is chosen randomly
let num_peers = peers.len();
let rng = rand::random::<usize>() % cmp::max(num_peers, 1);
for (peer_id, peer) in peers.iter().chain(peers.iter()).skip(rng).take(num_peers) {
let history_len = pending.query_id_history.len();
let start = if history_len == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would peer_offset be a better name than start. I think .skip(start) below reads a bit weird.

pending.remaining_query_count = self.base_retry_number;
let rand = rand::random::<usize>();
pending.base_query_index = rand;
rand
} else {
pending.base_query_index + history_len
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps make this an explicit wrapping add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about what you mean by explicit wrapping add ?

Copy link
Collaborator

@niklasad1 niklasad1 Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Rob is referring to <integer>::wrapping_add(), see https://doc.rust-lang.org/std/primitive.usize.html#method.wrapping_add

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's nice.

} % cmp::max(num_peers, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we lose peers along the way we might end up re-querying some of them. but generally the peer set will be decently stable so maybe not the worst thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also did a test on hashset insertion a few line after, so it should be fine.

let init_remaining_query_count = pending.remaining_query_count; // to fail in case of big reduction of nb of peers
for (peer_id, peer) in peers.iter().chain(peers.iter())
.skip(start).take(num_peers) {
// TODO: see which requests can be answered by the cache?

if !peer.can_fulfill(&pending.required_capabilities) {
continue
if pending.remaining_query_count == 0 {
break
}

match ctx.request_from(*peer_id, pending.net_requests.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Dispatched request {} to peer {}", req_id, peer_id);
self.in_transit.write().insert(req_id, pending);
return None
if pending.query_id_history.insert(peer_id.clone()) {

if !peer.can_fulfill(&pending.required_capabilities) {
trace!(target: "on_demand", "Peer {} without required capabilities, skipping, {} remaining attempts", peer_id, pending.remaining_query_count);
continue
}

pending.remaining_query_count -= 1;
pending.inactive_time_limit = None;

match ctx.request_from(*peer_id, pending.net_requests.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Dispatched request {} to peer {}, {} remaining attempts", req_id, peer_id, pending.remaining_query_count);
self.in_transit.write().insert(req_id, pending);
return None
}
Err(net::Error::NoCredits) | Err(net::Error::NotServer) => {}
Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e),
}
Err(net::Error::NoCredits) | Err(net::Error::NotServer) => {}
Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e),
}
}

// TODO: maximum number of failures _when we have peers_.
Some(pending)
if pending.remaining_query_count == 0 {
pending.no_response();
None
} else if init_remaining_query_count == pending.remaining_query_count {
if let Some(query_inactive_time_limit) = self.query_inactive_time_limit {
let now = SystemTime::now();
if pending.inactive_time_limit.is_none() {
debug!(target: "on_demand", "No more peer to query, waiting for {} seconds until dropping query", query_inactive_time_limit.as_secs());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/No more peer/No more peers/

pending.inactive_time_limit = Some(now + query_inactive_time_limit);
} else {
if now > pending.inactive_time_limit.unwrap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't allow unwrap in the codebase. You should use expect with a proof that it is not going to panic. Maybe rewrite this block with an if let Some(x) = pending.inactive_time_limit?

return None
}
}
}
Some(pending)
} else {
Some(pending)
}
})
.collect(); // `pending` now contains all requests we couldn't dispatch.

Expand All @@ -407,6 +485,21 @@ impl OnDemand {
self.attempt_dispatch(ctx);
}
}

/// Changes default number of retry for query.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Set the retry count for a query"

pub fn default_retry_number(&mut self, nb_retry: usize) {
self.base_retry_number = nb_retry;
}

/// Changes default time limit for query.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Set the time limit for a query"

pub fn query_inactive_time_limit(&mut self, inactive_time_limit: u64) {
self.query_inactive_time_limit = if inactive_time_limit == 0 {
None
} else {
Some(Duration::from_millis(inactive_time_limit))
};
}

}

impl Handler for OnDemand {
Expand Down Expand Up @@ -459,6 +552,16 @@ impl Handler for OnDemand {
None => return,
};

if responses.len() == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use is_empty() instead of len() == 0

if pending.remaining_query_count == 0 {
pending.no_response();
return;
}
} else {
// do not keep query counter for others elements of this batch
pending.query_id_history.clear();
}

// for each incoming response
// 1. ensure verification data filled.
// 2. pending.requests.supply_response
Expand Down
2 changes: 1 addition & 1 deletion ethcore/light/src/on_demand/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ fn no_capabilities() {

harness.inject_peer(peer_id, Peer {
status: dummy_status(),
capabilities: capabilities,
capabilities: capabilities.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove clone here capabilities is Copy!

});

let _recv = harness.service.request_raw(
Expand Down
31 changes: 29 additions & 2 deletions parity/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,15 @@ usage! {
"--ipfs-api-cors=[URL]",
"Specify CORS header for IPFS API responses. Special options: \"all\", \"none\".",

["Light Client Options"]
ARG arg_on_demand_nb_retry: (Option<usize>) = None, or |c: &Config| c.light.as_ref()?.on_demand_nb_retry,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry_count is better than nb_retry

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All other options prefer - to _. I think these should be consistent with the rest.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please specify all default values and unit of measure where relevant (e.g. "seconds")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I miss the - usage, it is major (displayed in parity --help).

"--on_demand_nb_retry=[RETRIES]",
"Specify maximum number of retry query to send to other node for a query.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Set the query retry count"


ARG arg_on_demand_inactive_time_limit: (Option<u64>) = None, or |c: &Config| c.light.as_ref()?.on_demand_inactive_time_limit,
"--on_demand_inactive_time_limit=[MS]",
"Specify maximum total time limit for a light client query to stay inactive. O for no limit.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Set light client query time out (seconds). 0 for no limit"

Q: Does 0 really mean "wait forever for a response"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is currently in MilliSeconds ([MS]), and yes 0 is no limit wait forever; it is not that unsafe because it is combined with the maximum number of retries (at first I implement it without any timeout).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it is not a hard timelimit, it is really an inactive time (if new peers are added regularly the request could run for way longer than this value). Inactive means that there was no new query for some time, if I got a new query in between the inactive time is reset to 0.


["Secret Store Options"]
FLAG flag_no_secretstore: (bool) = false, or |c: &Config| c.secretstore.as_ref()?.disable.clone(),
"--no-secretstore",
Expand Down Expand Up @@ -875,7 +884,7 @@ usage! {
"Target size of the whisper message pool in megabytes.",

["Legacy Options"]
// Options that are hidden from config, but are still unique for its functionality.
// Options that are hidden from config, but are still unique for its functionality.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird whitespace diff here


FLAG flag_geth: (bool) = false, or |_| None,
"--geth",
Expand Down Expand Up @@ -1100,6 +1109,7 @@ struct Config {
misc: Option<Misc>,
stratum: Option<Stratum>,
whisper: Option<Whisper>,
light: Option<Light>,
}

#[derive(Default, Debug, PartialEq, Deserialize)]
Expand Down Expand Up @@ -1364,12 +1374,21 @@ struct Whisper {
pool_size: Option<usize>,
}

#[derive(Default, Debug, PartialEq, Deserialize)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this Copy and Clone?

#[serde(deny_unknown_fields)]
struct Light {
on_demand_nb_retry: Option<usize>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_demand_retry_count

on_demand_inactive_time_limit: Option<u64>,
}



Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stray duplicate empty lines, one should be enough!

#[cfg(test)]
mod tests {
use super::{
Args, ArgsError,
Config, Operating, Account, Ui, Network, Ws, Rpc, Ipc, Dapps, Ipfs, Mining, Footprint,
Snapshots, Misc, Whisper, SecretStore,
Snapshots, Misc, Whisper, SecretStore, Light,
};
use toml;
use clap::{ErrorKind as ClapErrorKind};
Expand Down Expand Up @@ -1772,6 +1791,10 @@ mod tests {
arg_snapshot_at: "latest".into(),
flag_no_periodic_snapshot: false,

// -- Light options.
arg_on_demand_nb_retry: Some(15),
arg_on_demand_inactive_time_limit: Some(15000),

// -- Whisper options.
flag_whisper: false,
arg_whisper_pool_size: 20,
Expand Down Expand Up @@ -2019,6 +2042,10 @@ mod tests {
scale_verifiers: Some(false),
num_verifiers: None,
}),
light: Some(Light {
on_demand_nb_retry: Some(12),
on_demand_inactive_time_limit: Some(20000),
}),
snapshots: Some(Snapshots {
disable_periodic: Some(true),
}),
Expand Down
4 changes: 4 additions & 0 deletions parity/cli/tests/config.full.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ fat_db = "auto"
scale_verifiers = true
num_verifiers = 6

[light]
on_demand_nb_retry = 15
on_demand_inactive_time_limit = 15000

[snapshots]
disable_periodic = false

Expand Down
4 changes: 4 additions & 0 deletions parity/cli/tests/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ db_compaction = "ssd"
fat_db = "off"
scale_verifiers = false

[light]
on_demand_nb_retry = 12
on_demand_inactive_time_limit = 20000

[snapshots]
disable_periodic = true

Expand Down
Loading