Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Commit 386cdb8

Browse files
rphmeierarkpar
authored andcommitted
Back-references for the on-demand service (#5573)
* header back-references for on demand * initial back-reference implementation for on demand requests * answer requests from cache * answer requests from cache, add tests * strongly typed responses for vectors of homogeneous requests * fix fallout in RPC without optimizing
1 parent aa41b48 commit 386cdb8

File tree

9 files changed

+782
-430
lines changed

9 files changed

+782
-430
lines changed

ethcore/light/src/on_demand/mod.rs

+152-219
Large diffs are not rendered by default.

ethcore/light/src/on_demand/request.rs

+299-92
Large diffs are not rendered by default.

ethcore/light/src/on_demand/tests.rs

+110-10
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use ::request::{self as basic_request, Response};
2828

2929
use std::sync::Arc;
3030

31-
use super::{request, OnDemand, Peer};
31+
use super::{request, OnDemand, Peer, HeaderRef};
3232

3333
// useful contexts to give the service.
3434
enum Context {
@@ -122,7 +122,10 @@ fn dummy_capabilities() -> Capabilities {
122122
#[test]
123123
fn detects_hangup() {
124124
let on_demand = Harness::create().service;
125-
let result = on_demand.header_by_hash(&Context::NoOp, request::HeaderByHash(H256::default()));
125+
let result = on_demand.request_raw(
126+
&Context::NoOp,
127+
vec![request::HeaderByHash(H256::default().into()).into()],
128+
);
126129

127130
assert_eq!(on_demand.pending.read().len(), 1);
128131
drop(result);
@@ -148,7 +151,7 @@ fn single_request() {
148151

149152
let recv = harness.service.request_raw(
150153
&Context::NoOp,
151-
vec![request::HeaderByHash(header.hash()).into()]
154+
vec![request::HeaderByHash(header.hash().into()).into()]
152155
).unwrap();
153156

154157
assert_eq!(harness.service.pending.read().len(), 1);
@@ -182,7 +185,7 @@ fn no_capabilities() {
182185

183186
let _recv = harness.service.request_raw(
184187
&Context::NoOp,
185-
vec![request::HeaderByHash(Default::default()).into()]
188+
vec![request::HeaderByHash(H256::default().into()).into()]
186189
).unwrap();
187190

188191
assert_eq!(harness.service.pending.read().len(), 1);
@@ -209,7 +212,7 @@ fn reassign() {
209212

210213
let recv = harness.service.request_raw(
211214
&Context::NoOp,
212-
vec![request::HeaderByHash(header.hash()).into()]
215+
vec![request::HeaderByHash(header.hash().into()).into()]
213216
).unwrap();
214217

215218
assert_eq!(harness.service.pending.read().len(), 1);
@@ -264,8 +267,8 @@ fn partial_response() {
264267
let recv = harness.service.request_raw(
265268
&Context::NoOp,
266269
vec![
267-
request::HeaderByHash(header1.hash()).into(),
268-
request::HeaderByHash(header2.hash()).into(),
270+
request::HeaderByHash(header1.hash().into()).into(),
271+
request::HeaderByHash(header2.hash().into()).into(),
269272
],
270273
).unwrap();
271274

@@ -323,8 +326,8 @@ fn part_bad_part_good() {
323326
let recv = harness.service.request_raw(
324327
&Context::NoOp,
325328
vec![
326-
request::HeaderByHash(header1.hash()).into(),
327-
request::HeaderByHash(header2.hash()).into(),
329+
request::HeaderByHash(header1.hash().into()).into(),
330+
request::HeaderByHash(header2.hash().into()).into(),
328331
],
329332
).unwrap();
330333

@@ -378,7 +381,7 @@ fn wrong_kind() {
378381

379382
let _recv = harness.service.request_raw(
380383
&Context::NoOp,
381-
vec![request::HeaderByHash(Default::default()).into()]
384+
vec![request::HeaderByHash(H256::default().into()).into()]
382385
).unwrap();
383386

384387
assert_eq!(harness.service.pending.read().len(), 1);
@@ -395,3 +398,100 @@ fn wrong_kind() {
395398

396399
assert_eq!(harness.service.pending.read().len(), 1);
397400
}
401+
402+
#[test]
403+
fn back_references() {
404+
let harness = Harness::create();
405+
406+
let peer_id = 10101;
407+
let req_id = ReqId(14426);
408+
409+
harness.inject_peer(peer_id, Peer {
410+
status: dummy_status(),
411+
capabilities: dummy_capabilities(),
412+
});
413+
414+
let header = Header::default();
415+
let encoded = encoded::Header::new(header.rlp(Seal::With));
416+
417+
let recv = harness.service.request_raw(
418+
&Context::NoOp,
419+
vec![
420+
request::HeaderByHash(header.hash().into()).into(),
421+
request::BlockReceipts(HeaderRef::Unresolved(0, header.hash().into())).into(),
422+
]
423+
).unwrap();
424+
425+
assert_eq!(harness.service.pending.read().len(), 1);
426+
427+
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id));
428+
429+
assert_eq!(harness.service.pending.read().len(), 0);
430+
431+
harness.service.on_responses(
432+
&Context::WithPeer(peer_id),
433+
req_id,
434+
&[
435+
Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] }),
436+
Response::Receipts(basic_request::ReceiptsResponse { receipts: vec![] }),
437+
]
438+
);
439+
440+
assert!(recv.wait().is_ok());
441+
}
442+
443+
#[test]
444+
#[should_panic]
445+
fn bad_back_reference() {
446+
let harness = Harness::create();
447+
448+
let header = Header::default();
449+
450+
let _ = harness.service.request_raw(
451+
&Context::NoOp,
452+
vec![
453+
request::HeaderByHash(header.hash().into()).into(),
454+
request::BlockReceipts(HeaderRef::Unresolved(1, header.hash().into())).into(),
455+
]
456+
).unwrap();
457+
}
458+
459+
#[test]
460+
fn fill_from_cache() {
461+
let harness = Harness::create();
462+
463+
let peer_id = 10101;
464+
let req_id = ReqId(14426);
465+
466+
harness.inject_peer(peer_id, Peer {
467+
status: dummy_status(),
468+
capabilities: dummy_capabilities(),
469+
});
470+
471+
let header = Header::default();
472+
let encoded = encoded::Header::new(header.rlp(Seal::With));
473+
474+
let recv = harness.service.request_raw(
475+
&Context::NoOp,
476+
vec![
477+
request::HeaderByHash(header.hash().into()).into(),
478+
request::BlockReceipts(HeaderRef::Unresolved(0, header.hash().into())).into(),
479+
]
480+
).unwrap();
481+
482+
assert_eq!(harness.service.pending.read().len(), 1);
483+
484+
harness.service.dispatch_pending(&Context::RequestFrom(peer_id, req_id));
485+
486+
assert_eq!(harness.service.pending.read().len(), 0);
487+
488+
harness.service.on_responses(
489+
&Context::WithPeer(peer_id),
490+
req_id,
491+
&[
492+
Response::Headers(basic_request::HeadersResponse { headers: vec![encoded] }),
493+
]
494+
);
495+
496+
assert!(recv.wait().is_ok());
497+
}

ethcore/light/src/types/request/builder.rs

+39-14
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! supplied as well.
2020
2121
use std::collections::HashMap;
22+
use std::ops::{Deref, DerefMut};
2223
use request::{
2324
IncompleteRequest, OutputKind, Output, NoSuchOutput, ResponseError, ResponseLike,
2425
};
@@ -124,23 +125,14 @@ impl<T: IncompleteRequest + Clone> Requests<T> {
124125
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
125126
}
126127
}
127-
}
128-
129-
impl<T: super::CheckedRequest> Requests<T> {
130-
/// Supply a response for the next request.
131-
/// Fails on: wrong request kind, all requests answered already.
132-
pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response)
133-
-> Result<T::Extract, ResponseError<T::Error>>
134-
{
135-
let idx = self.answered;
136128

137-
// check validity.
138-
if self.is_complete() { return Err(ResponseError::Unexpected) }
139-
140-
let extracted = self.requests[idx]
141-
.check_response(env, response).map_err(ResponseError::Validity)?;
129+
/// Supply a response, asserting its correctness.
130+
/// Fill outputs based upon it.
131+
pub fn supply_response_unchecked<R: ResponseLike>(&mut self, response: &R) {
132+
if self.is_complete() { return }
142133

143134
let outputs = &mut self.outputs;
135+
let idx = self.answered;
144136
response.fill_outputs(|out_idx, output| {
145137
// we don't need to check output kinds here because all back-references
146138
// are validated in the builder.
@@ -154,7 +146,26 @@ impl<T: super::CheckedRequest> Requests<T> {
154146
if let Some(ref mut req) = self.requests.get_mut(self.answered) {
155147
req.fill(|req_idx, out_idx| outputs.get(&(req_idx, out_idx)).cloned().ok_or(NoSuchOutput))
156148
}
149+
}
150+
}
151+
152+
impl<T: super::CheckedRequest + Clone> Requests<T> {
153+
/// Supply a response for the next request.
154+
/// Fails on: wrong request kind, all requests answered already.
155+
pub fn supply_response(&mut self, env: &T::Environment, response: &T::Response)
156+
-> Result<T::Extract, ResponseError<T::Error>>
157+
{
158+
let idx = self.answered;
159+
160+
// check validity.
161+
if idx == self.requests.len() { return Err(ResponseError::Unexpected) }
162+
let completed = self.next_complete()
163+
.expect("only fails when all requests have been answered; this just checked against; qed");
164+
165+
let extracted = self.requests[idx]
166+
.check_response(&completed, env, response).map_err(ResponseError::Validity)?;
157167

168+
self.supply_response_unchecked(response);
158169
Ok(extracted)
159170
}
160171
}
@@ -182,6 +193,20 @@ impl Requests<super::Request> {
182193
}
183194
}
184195

196+
impl<T: IncompleteRequest> Deref for Requests<T> {
197+
type Target = [T];
198+
199+
fn deref(&self) -> &[T] {
200+
&self.requests[..]
201+
}
202+
}
203+
204+
impl<T: IncompleteRequest> DerefMut for Requests<T> {
205+
fn deref_mut(&mut self) -> &mut [T] {
206+
&mut self.requests[..]
207+
}
208+
}
209+
185210
#[cfg(test)]
186211
mod tests {
187212
use request::*;

ethcore/light/src/types/request/mod.rs

+26-3
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub enum ResponseError<T> {
8383
}
8484

8585
/// An input to a request.
86-
#[derive(Debug, Clone, PartialEq, Eq)]
86+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8787
pub enum Field<T> {
8888
/// A pre-specified input.
8989
Scalar(T),
@@ -93,6 +93,29 @@ pub enum Field<T> {
9393
}
9494

9595
impl<T> Field<T> {
96+
/// Helper for creating a new back-reference field.
97+
pub fn back_ref(idx: usize, req: usize) -> Self {
98+
Field::BackReference(idx, req)
99+
}
100+
101+
/// map a scalar into some other item.
102+
pub fn map<F, U>(self, f: F) -> Field<U> where F: FnOnce(T) -> U {
103+
match self {
104+
Field::Scalar(x) => Field::Scalar(f(x)),
105+
Field::BackReference(req, idx) => Field::BackReference(req, idx),
106+
}
107+
}
108+
109+
/// Attempt to get a reference to the inner scalar.
110+
pub fn as_ref(&self) -> Option<&T> {
111+
match *self {
112+
Field::Scalar(ref x) => Some(x),
113+
Field::BackReference(_, _) => None,
114+
}
115+
}
116+
117+
118+
96119
// attempt conversion into scalar value.
97120
fn into_scalar(self) -> Result<T, NoSuchOutput> {
98121
match self {
@@ -400,7 +423,7 @@ impl CheckedRequest for Request {
400423
type Error = WrongKind;
401424
type Environment = ();
402425

403-
fn check_response(&self, _: &(), response: &Response) -> Result<(), WrongKind> {
426+
fn check_response(&self, _: &Self::Complete, _: &(), response: &Response) -> Result<(), WrongKind> {
404427
if self.kind() == response.kind() {
405428
Ok(())
406429
} else {
@@ -587,7 +610,7 @@ pub trait CheckedRequest: IncompleteRequest {
587610
type Environment;
588611

589612
/// Check whether the response matches (beyond the type).
590-
fn check_response(&self, &Self::Environment, &Self::Response) -> Result<Self::Extract, Self::Error>;
613+
fn check_response(&self, &Self::Complete, &Self::Environment, &Self::Response) -> Result<Self::Extract, Self::Error>;
591614
}
592615

593616
/// A response-like object.

parity/light_helpers/queue_cull.rs

+21-17
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use light::client::Client;
2727
use light::on_demand::{request, OnDemand};
2828
use light::TransactionQueue;
2929

30-
use futures::{future, stream, Future, Stream};
30+
use futures::{future, Future};
3131

3232
use parity_reactor::Remote;
3333

@@ -73,28 +73,32 @@ impl IoHandler<ClientIoMessage> for QueueCull {
7373
self.remote.spawn_with_timeout(move || {
7474
let maybe_fetching = sync.with_context(move |ctx| {
7575
// fetch the nonce of each sender in the queue.
76-
let nonce_futures = senders.iter()
77-
.map(|&address| request::Account { header: best_header.clone(), address: address })
78-
.map(move |request| {
79-
on_demand.account(ctx, request)
80-
.map(move |maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
76+
let nonce_reqs = senders.iter()
77+
.map(|&address| request::Account { header: best_header.clone().into(), address: address })
78+
.collect::<Vec<_>>();
79+
80+
// when they come in, update each sender to the new nonce.
81+
on_demand.request(ctx, nonce_reqs)
82+
.expect("No back-references; therefore all back-references are valid; qed")
83+
.map(move |accs| {
84+
let txq = txq.write();
85+
let _ = accs.into_iter()
86+
.map(|maybe_acc| maybe_acc.map_or(start_nonce, |acc| acc.nonce))
87+
.zip(senders)
88+
.fold(txq, |mut txq, (nonce, addr)| {
89+
txq.cull(addr, nonce);
90+
txq
91+
});
8192
})
82-
.zip(senders.iter())
83-
.map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce)));
84-
85-
// as they come in, update each sender to the new nonce.
86-
stream::futures_unordered(nonce_futures)
87-
.fold(txq, |txq, (address, nonce)| {
88-
txq.write().cull(address, nonce);
89-
future::ok(txq)
90-
})
91-
.map(|_| ()) // finally, discard the txq handle and log errors.
9293
.map_err(|_| debug!(target: "cull", "OnDemand prematurely closed channel."))
9394
});
9495

9596
match maybe_fetching {
9697
Some(fut) => fut.boxed(),
97-
None => future::ok(()).boxed(),
98+
None => {
99+
debug!(target: "cull", "Unable to acquire network context; qed");
100+
future::ok(()).boxed()
101+
}
98102
}
99103
}, Duration::from_millis(PURGE_TIMEOUT_MS), || {})
100104
}

0 commit comments

Comments
 (0)