Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Allow dropping light client RPC query with no results #9318

Merged
merged 18 commits into from
Sep 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 170 additions & 25 deletions ethcore/light/src/on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@
//! will take the raw data received here and extract meaningful results from it.

use std::cmp;
use std::collections::HashMap;
use std::collections::{HashMap, BTreeSet};
use std::marker::PhantomData;
use std::sync::Arc;

use ethcore::executed::{Executed, ExecutionError};

use futures::{Poll, Future};
use futures::sync::oneshot::{self, Receiver, Canceled};
use futures::{Poll, Future, Async};
use futures::sync::oneshot::{self, Receiver};
use network::PeerId;
use parking_lot::{RwLock, Mutex};
use rand;
use std::time::{Duration, SystemTime};

use net::{
self, Handler, PeerStatus, Status, Capabilities,
Expand All @@ -49,7 +50,45 @@ pub mod request;
/// The result of execution
pub type ExecutionResult = Result<Executed, ExecutionError>;

/// The default number of retries for OnDemand queries to send to the other nodes
pub const DEFAULT_RETRY_COUNT: usize = 10;

/// The default time limit in milliseconds for inactive (no new peer to connect to) OnDemand queries (0 for unlimited)
pub const DEFAULT_QUERY_TIME_LIMIT: Duration = Duration::from_millis(10000);

const NULL_DURATION: Duration = Duration::from_secs(0);

/// OnDemand related errors
pub mod error {
use futures::sync::oneshot::Canceled;

error_chain! {

foreign_links {
ChannelCanceled(Canceled) #[doc = "Canceled oneshot channel"];
}

errors {
#[doc = "Max number of on-demand query attempts reached without result."]
MaxAttemptReach(query_index: usize) {
description("On-demand query limit reached")
display("On-demand query limit reached on query #{}", query_index)
}

#[doc = "No reply with current peer set, time out occured while waiting for new peers for additional query attempt."]
TimeoutOnNewPeers(query_index: usize, remaining_attempts: usize) {
description("Timeout for On-demand query")
display("Timeout for On-demand query; {} query attempts remain for query #{}", remaining_attempts, query_index)
}

}

}

}

// relevant peer info.
#[derive(Debug, Clone, PartialEq, Eq)]
struct Peer {
status: Status,
capabilities: Capabilities,
Expand All @@ -74,13 +113,21 @@ impl Peer {
}
}


/// Either an array of responses or a single error.
type PendingResponse = self::error::Result<Vec<Response>>;

// Attempted request info and sender to put received value.
struct Pending {
requests: basic_request::Batch<CheckedRequest>,
net_requests: basic_request::Batch<NetworkRequest>,
required_capabilities: Capabilities,
responses: Vec<Response>,
sender: oneshot::Sender<Vec<Response>>,
sender: oneshot::Sender<PendingResponse>,
base_query_index: usize,
remaining_query_count: usize,
query_id_history: BTreeSet<PeerId>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to use BTreeSet instead of HashSet? We don't rely on ids order as it seems, so HashSet should more appropriate maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect it to be a small set, and I generally favor a btreeset in those case.

inactive_time_limit: Option<SystemTime>,
}

impl Pending {
Expand Down Expand Up @@ -142,7 +189,9 @@ impl Pending {
// if the requests are complete, send the result and consume self.
fn try_complete(self) -> Option<Self> {
if self.requests.is_complete() {
let _ = self.sender.send(self.responses);
if self.sender.send(Ok(self.responses)).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on complete request at query #{}", self.query_id_history.len());
}
None
} else {
Some(self)
Expand Down Expand Up @@ -177,6 +226,25 @@ impl Pending {
self.net_requests = builder.build();
self.required_capabilities = capabilities;
}

// returning no reponse, it will result in an error.
// self is consumed on purpose.
fn no_response(self) {
trace!(target: "on_demand", "Dropping a pending query (no reply) at query #{}", self.query_id_history.len());
let err = self::error::ErrorKind::MaxAttemptReach(self.requests.num_answered());
if self.sender.send(Err(err.into())).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on no response");
}
}

// returning a peer discovery timeout during query attempts
fn time_out(self) {
trace!(target: "on_demand", "Dropping a pending query (no new peer time out) at query #{}", self.query_id_history.len());
let err = self::error::ErrorKind::TimeoutOnNewPeers(self.requests.num_answered(), self.query_id_history.len());
if self.sender.send(Err(err.into())).is_err() {
debug!(target: "on_demand", "Dropped oneshot channel receiver on time out");
}
}
}

// helper to guess capabilities required for a given batch of network requests.
Expand Down Expand Up @@ -230,16 +298,21 @@ fn guess_capabilities(requests: &[CheckedRequest]) -> Capabilities {
/// A future extracting the concrete output type of the generic adapter
/// from a vector of responses.
pub struct OnResponses<T: request::RequestAdapter> {
receiver: Receiver<Vec<Response>>,
receiver: Receiver<PendingResponse>,
_marker: PhantomData<T>,
}

impl<T: request::RequestAdapter> Future for OnResponses<T> {
type Item = T::Out;
type Error = Canceled;
type Error = self::error::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.receiver.poll().map(|async| async.map(T::extract_from))
match self.receiver.poll() {
Ok(Async::Ready(Ok(v))) => Ok(Async::Ready(T::extract_from(v))),
Ok(Async::Ready(Err(e))) => Err(e),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => Err(e.into()),
}
}
}

Expand All @@ -253,9 +326,12 @@ pub struct OnDemand {
in_transit: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>,
no_immediate_dispatch: bool,
base_retry_count: usize,
query_inactive_time_limit: Option<Duration>,
}

impl OnDemand {

/// Create a new `OnDemand` service with the given cache.
pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
OnDemand {
Expand All @@ -264,6 +340,8 @@ impl OnDemand {
in_transit: RwLock::new(HashMap::new()),
cache,
no_immediate_dispatch: false,
base_retry_count: DEFAULT_RETRY_COUNT,
query_inactive_time_limit: Some(DEFAULT_QUERY_TIME_LIMIT),
}
}

Expand All @@ -282,11 +360,11 @@ impl OnDemand {
/// Fails if back-references are not coherent.
/// The returned vector of responses will correspond to the requests exactly.
pub fn request_raw(&self, ctx: &BasicContext, requests: Vec<Request>)
-> Result<Receiver<Vec<Response>>, basic_request::NoSuchOutput>
-> Result<Receiver<PendingResponse>, basic_request::NoSuchOutput>
{
let (sender, receiver) = oneshot::channel();
if requests.is_empty() {
assert!(sender.send(Vec::new()).is_ok(), "receiver still in scope; qed");
assert!(sender.send(Ok(Vec::new())).is_ok(), "receiver still in scope; qed");
return Ok(receiver);
}

Expand Down Expand Up @@ -325,6 +403,10 @@ impl OnDemand {
required_capabilities: capabilities,
responses,
sender,
base_query_index: 0,
remaining_query_count: 0,
query_id_history: BTreeSet::new(),
inactive_time_limit: None,
});

Ok(receiver)
Expand Down Expand Up @@ -363,30 +445,68 @@ impl OnDemand {
let peers = self.peers.read();
*pending = ::std::mem::replace(&mut *pending, Vec::new()).into_iter()
.filter(|pending| !pending.sender.is_canceled())
.filter_map(|pending| {
.filter_map(|mut pending| {
// the peer we dispatch to is chosen randomly
let num_peers = peers.len();
let rng = rand::random::<usize>() % cmp::max(num_peers, 1);
for (peer_id, peer) in peers.iter().chain(peers.iter()).skip(rng).take(num_peers) {
let history_len = pending.query_id_history.len();
let offset = if history_len == 0 {
pending.remaining_query_count = self.base_retry_count;
let rand = rand::random::<usize>();
pending.base_query_index = rand;
rand
} else {
pending.base_query_index + history_len
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps make this an explicit wrapping add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about what you mean by explicit wrapping add ?

Copy link
Collaborator

@niklasad1 niklasad1 Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Rob is referring to <integer>::wrapping_add(), see https://doc.rust-lang.org/std/primitive.usize.html#method.wrapping_add

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's nice.

} % cmp::max(num_peers, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we lose peers along the way we might end up re-querying some of them. but generally the peer set will be decently stable so maybe not the worst thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also did a test on hashset insertion a few line after, so it should be fine.

let init_remaining_query_count = pending.remaining_query_count; // to fail in case of big reduction of nb of peers
for (peer_id, peer) in peers.iter().chain(peers.iter())
.skip(offset).take(num_peers) {
// TODO: see which requests can be answered by the cache?

if !peer.can_fulfill(&pending.required_capabilities) {
continue
if pending.remaining_query_count == 0 {
break
}

match ctx.request_from(*peer_id, pending.net_requests.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Dispatched request {} to peer {}", req_id, peer_id);
self.in_transit.write().insert(req_id, pending);
return None
if pending.query_id_history.insert(peer_id.clone()) {

if !peer.can_fulfill(&pending.required_capabilities) {
trace!(target: "on_demand", "Peer {} without required capabilities, skipping, {} remaining attempts", peer_id, pending.remaining_query_count);
continue
}

pending.remaining_query_count -= 1;
pending.inactive_time_limit = None;

match ctx.request_from(*peer_id, pending.net_requests.clone()) {
Ok(req_id) => {
trace!(target: "on_demand", "Dispatched request {} to peer {}, {} remaining attempts", req_id, peer_id, pending.remaining_query_count);
self.in_transit.write().insert(req_id, pending);
return None
}
Err(net::Error::NoCredits) | Err(net::Error::NotServer) => {}
Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e),
}
Err(net::Error::NoCredits) | Err(net::Error::NotServer) => {}
Err(e) => debug!(target: "on_demand", "Error dispatching request to peer: {}", e),
}
}

// TODO: maximum number of failures _when we have peers_.
Some(pending)
if pending.remaining_query_count == 0 {
pending.no_response();
None
} else if init_remaining_query_count == pending.remaining_query_count {
if let Some(query_inactive_time_limit) = self.query_inactive_time_limit {
let now = SystemTime::now();
if let Some(inactive_time_limit) = pending.inactive_time_limit {
if now > inactive_time_limit {
pending.time_out();
return None
}
} else {
debug!(target: "on_demand", "No more peers to query, waiting for {} seconds until dropping query", query_inactive_time_limit.as_secs());
pending.inactive_time_limit = Some(now + query_inactive_time_limit);
}
}
Some(pending)
} else {
Some(pending)
}
})
.collect(); // `pending` now contains all requests we couldn't dispatch.

Expand All @@ -406,6 +526,21 @@ impl OnDemand {
self.attempt_dispatch(ctx);
}
}

/// Set the retry count for a query.
pub fn default_retry_number(&mut self, nb_retry: usize) {
self.base_retry_count = nb_retry;
}

/// Set the time limit for a query.
pub fn query_inactive_time_limit(&mut self, inactive_time_limit: Duration) {
self.query_inactive_time_limit = if inactive_time_limit == NULL_DURATION {
None
} else {
Some(inactive_time_limit)
};
}

}

impl Handler for OnDemand {
Expand Down Expand Up @@ -458,6 +593,16 @@ impl Handler for OnDemand {
None => return,
};

if responses.is_empty() {
if pending.remaining_query_count == 0 {
pending.no_response();
return;
}
} else {
// do not keep query counter for others elements of this batch
pending.query_id_history.clear();
}

// for each incoming response
// 1. ensure verification data filled.
// 2. pending.requests.supply_response
Expand Down
2 changes: 1 addition & 1 deletion ethcore/res/wasm-tests
Loading