Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Filters and block RPCs for the light client #5320

Merged
merged 13 commits into from
Apr 12, 2017
38 changes: 38 additions & 0 deletions ethcore/light/src/client/header_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,22 @@ impl HeaderChain {
}
}

/// Get a block's hash by ID. In the case of query by number, only canonical results
/// will be returned.
pub fn block_hash(&self, id: BlockId) -> Option<H256> {
match id {
BlockId::Earliest => Some(self.genesis_hash()),
BlockId::Hash(hash) => Some(hash),
BlockId::Number(num) => {
if self.best_block.read().number < num { return None }
self.candidates.read().get(&num).map(|entry| entry.canonical_hash)
}
BlockId::Latest | BlockId::Pending => {
Some(self.best_block.read().hash)
}
}
}

/// Get a block header. In the case of query by number, only canonical blocks
/// will be returned.
pub fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
Expand Down Expand Up @@ -395,6 +411,28 @@ impl HeaderChain {
}
}

/// Get a block's chain score.
/// Returns nothing for non-canonical blocks.
pub fn score(&self, id: BlockId) -> Option<U256> {
let genesis_hash = self.genesis_hash();
match id {
BlockId::Earliest | BlockId::Number(0) => Some(self.genesis_header.difficulty()),
BlockId::Hash(hash) if hash == genesis_hash => Some(self.genesis_header.difficulty()),
BlockId::Hash(hash) => match self.block_header(BlockId::Hash(hash)) {
Some(header) => self.candidates.read().get(&header.number())
.and_then(|era| era.candidates.iter().find(|e| e.hash == hash))
.map(|c| c.total_difficulty),
None => None,
},
BlockId::Number(num) => {
let candidates = self.candidates.read();
if self.best_block.read().number < num { return None }
candidates.get(&num).map(|era| era.candidates[0].total_difficulty)
}
BlockId::Latest | BlockId::Pending => Some(self.best_block.read().total_difficulty)
}
}

/// Get the best block's header.
pub fn best_header(&self) -> encoded::Header {
self.block_header(BlockId::Latest).expect("Header for best block always stored; qed")
Expand Down
26 changes: 25 additions & 1 deletion ethcore/light/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use ethcore::service::ClientIoMessage;
use ethcore::encoded;
use io::IoChannel;

use util::{H256, Mutex, RwLock};
use util::{H256, U256, Mutex, RwLock};
use util::kvdb::{KeyValueDB, CompactionProfile};

use self::header_chain::{AncestryIter, HeaderChain};
Expand Down Expand Up @@ -65,12 +65,18 @@ pub trait LightChainClient: Send + Sync {
/// parent queued prior.
fn queue_header(&self, header: Header) -> Result<H256, BlockImportError>;

/// Attempt to get a block hash by block id.
fn block_hash(&self, id: BlockId) -> Option<H256>;

/// Attempt to get block header by block id.
fn block_header(&self, id: BlockId) -> Option<encoded::Header>;

/// Get the best block header.
fn best_block_header(&self) -> encoded::Header;

/// Get a block's chain score by ID.
fn score(&self, id: BlockId) -> Option<U256>;

/// Get an iterator over a block and its ancestry.
fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box<Iterator<Item=encoded::Header> + 'a>;

Expand Down Expand Up @@ -181,6 +187,11 @@ impl Client {
self.queue.queue_info()
}

/// Attempt to get a block hash by block id.
pub fn block_hash(&self, id: BlockId) -> Option<H256> {
self.chain.block_hash(id)
}

/// Get a block header by Id.
pub fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.chain.block_header(id)
Expand All @@ -191,6 +202,11 @@ impl Client {
self.chain.best_header()
}

/// Get a block's chain score.
pub fn score(&self, id: BlockId) -> Option<U256> {
self.chain.score(id)
}

/// Get an iterator over a block and its ancestry.
pub fn ancestry_iter(&self, start: BlockId) -> AncestryIter {
self.chain.ancestry_iter(start)
Expand Down Expand Up @@ -308,6 +324,10 @@ impl LightChainClient for Client {
self.import_header(header)
}

fn block_hash(&self, id: BlockId) -> Option<H256> {
Client::block_hash(self, id)
}

fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
Client::block_header(self, id)
}
Expand All @@ -316,6 +336,10 @@ impl LightChainClient for Client {
Client::best_block_header(self)
}

fn score(&self, id: BlockId) -> Option<U256> {
Client::score(self, id)
}

fn ancestry_iter<'a>(&'a self, start: BlockId) -> Box<Iterator<Item=encoded::Header> + 'a> {
Box::new(Client::ancestry_iter(self, start))
}
Expand Down
27 changes: 19 additions & 8 deletions ethcore/light/src/on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use futures::sync::oneshot::{self, Sender, Receiver};
use network::PeerId;
use rlp::RlpStream;
use util::{Bytes, RwLock, Mutex, U256, H256};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY_LIST_RLP};
use util::sha3::{SHA3_NULL_RLP, SHA3_EMPTY, SHA3_EMPTY_LIST_RLP};

use net::{self, Handler, Status, Capabilities, Announcement, EventContext, BasicContext, ReqId};
use cache::Cache;
Expand Down Expand Up @@ -83,7 +83,7 @@ enum Pending {
HeaderByHash(request::HeaderByHash, Sender<encoded::Header>),
Block(request::Body, Sender<encoded::Block>),
BlockReceipts(request::BlockReceipts, Sender<Vec<Receipt>>),
Account(request::Account, Sender<Option<BasicAccount>>),
Account(request::Account, Sender<BasicAccount>),
Code(request::Code, Sender<Bytes>),
TxProof(request::TransactionProof, Sender<Result<Executed, ExecutionError>>),
}
Expand Down Expand Up @@ -136,18 +136,20 @@ pub struct OnDemand {
pending_requests: RwLock<HashMap<ReqId, Pending>>,
cache: Arc<Mutex<Cache>>,
orphaned_requests: RwLock<Vec<Pending>>,
start_nonce: U256,
}

const RECEIVER_IN_SCOPE: &'static str = "Receiver is still in scope, so it's not dropped; qed";

impl OnDemand {
/// Create a new `OnDemand` service with the given cache.
pub fn new(cache: Arc<Mutex<Cache>>) -> Self {
pub fn new(cache: Arc<Mutex<Cache>>, account_start_nonce: U256) -> Self {
OnDemand {
peers: RwLock::new(HashMap::new()),
pending_requests: RwLock::new(HashMap::new()),
cache: cache,
orphaned_requests: RwLock::new(Vec::new()),
start_nonce: account_start_nonce,
}
}

Expand Down Expand Up @@ -268,7 +270,7 @@ impl OnDemand {

/// Request an account by address and block header -- which gives a hash to query and a state root
/// to verify against.
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<Option<BasicAccount>> {
pub fn account(&self, ctx: &BasicContext, req: request::Account) -> Receiver<BasicAccount> {
let (sender, receiver) = oneshot::channel();
self.dispatch(ctx, Pending::Account(req, sender));
receiver
Expand All @@ -279,7 +281,7 @@ impl OnDemand {
let (sender, receiver) = oneshot::channel();

// fast path for no code.
if req.code_hash == ::util::sha3::SHA3_EMPTY {
if req.code_hash == SHA3_EMPTY {
sender.send(Vec::new()).expect(RECEIVER_IN_SCOPE)
} else {
self.dispatch(ctx, Pending::Code(req, sender));
Expand Down Expand Up @@ -497,10 +499,19 @@ impl Handler for OnDemand {
Pending::Account(req, sender) => {
if let NetworkResponse::Account(ref response) = *response {
match req.check_response(&response.proof) {
Ok(maybe_account) => {
Ok(account) => {
let account = account.unwrap_or_else(|| {
BasicAccount {
balance: 0.into(),
nonce: self.start_nonce,
code_hash: SHA3_EMPTY,
storage_root: SHA3_NULL_RLP
}
});

// TODO: validate against request outputs.
// needs engine + env info as part of request.
let _ = sender.send(maybe_account);
let _ = sender.send(account);
return
}
Err(e) => warn!(target: "on_demand", "Error handling response for state request: {:?}", e),
Expand Down Expand Up @@ -572,7 +583,7 @@ mod tests {
#[test]
fn detects_hangup() {
let cache = Arc::new(Mutex::new(Cache::new(Default::default(), Duration::hours(6))));
let on_demand = OnDemand::new(cache);
let on_demand = OnDemand::new(cache, 0.into());
let result = on_demand.header_by_hash(&FakeContext, request::HeaderByHash(H256::default()));

assert!(on_demand.orphaned_requests.read().len() == 1);
Expand Down
6 changes: 6 additions & 0 deletions ethcore/src/types/encoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ impl Block {
/// Decode to a full block.
pub fn decode(&self) -> FullBlock { ::rlp::decode(&self.0) }

/// Decode the header.
pub fn decode_header(&self) -> FullHeader { self.rlp().val_at(0) }

/// Clone the encoded header.
pub fn header(&self) -> Header { Header(self.rlp().at(0).as_raw().to_vec()) }

/// Get the rlp of this block.
#[inline]
pub fn rlp(&self) -> Rlp {
Expand Down
4 changes: 1 addition & 3 deletions parity/light_helpers/queue_cull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,14 @@ impl IoHandler<ClientIoMessage> for QueueCull {

let (sync, on_demand, txq) = (self.sync.clone(), self.on_demand.clone(), self.txq.clone());
let best_header = self.client.best_block_header();
let start_nonce = self.client.engine().account_start_nonce();

info!(target: "cull", "Attempting to cull queued transactions from {} senders.", senders.len());
self.remote.spawn_with_timeout(move || {
let maybe_fetching = sync.with_context(move |ctx| {
// fetch the nonce of each sender in the queue.
let nonce_futures = senders.iter()
.map(|&address| request::Account { header: best_header.clone(), address: address })
.map(|request| on_demand.account(ctx, request))
.map(move |fut| fut.map(move |x| x.map(|acc| acc.nonce).unwrap_or(start_nonce)))
.map(|request| on_demand.account(ctx, request).map(|acc| acc.nonce))
.zip(senders.iter())
.map(|(fut, &addr)| fut.map(move |nonce| (addr, nonce)));

Expand Down
7 changes: 3 additions & 4 deletions parity/rpc_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl Dependencies for FullDependencies {
);
handler.extend_with(client.to_delegate());

let filter_client = EthFilterClient::new(&self.client, &self.miner);
let filter_client = EthFilterClient::new(self.client.clone(), self.miner.clone());
handler.extend_with(filter_client.to_delegate());

add_signing_methods!(EthSigning, handler, self);
Expand Down Expand Up @@ -377,9 +377,8 @@ impl Dependencies for LightDependencies {
self.secret_store.clone(),
self.cache.clone(),
);
handler.extend_with(client.to_delegate());

// TODO: filters.
handler.extend_with(Eth::to_delegate(client.clone()));
handler.extend_with(EthFilter::to_delegate(client));
add_signing_methods!(EthSigning, handler, self);
},
Api::Personal => {
Expand Down
3 changes: 2 additions & 1 deletion parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ fn execute_light(cmd: RunCmd, can_restart: bool, logger: Arc<RotatingLogger>) ->
let cache = Arc::new(::util::Mutex::new(cache));

// start on_demand service.
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone()));
let account_start_nonce = service.client().engine().account_start_nonce();
let on_demand = Arc::new(::light::on_demand::OnDemand::new(cache.clone(), account_start_nonce));

// set network path.
net_conf.net_config_path = Some(db_dirs.network_path().to_string_lossy().into_owned());
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/v1/helpers/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl LightDispatcher {

match nonce_future {
Some(x) =>
x.map(|acc| acc.map_or_else(Default::default, |acc| acc.nonce))
x.map(|acc| acc.nonce)
.map_err(|_| errors::no_light_peers())
.boxed(),
None => future::err(errors::network_disabled()).boxed()
Expand Down
10 changes: 5 additions & 5 deletions rpc/src/v1/impls/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,23 +544,23 @@ impl<C, SN: ?Sized, S: ?Sized, M, EM> Eth for EthClient<C, SN, S, M, EM> where
Err(errors::deprecated("Compilation functionality is deprecated.".to_string()))
}

fn logs(&self, filter: Filter) -> Result<Vec<Log>, Error> {
fn logs(&self, filter: Filter) -> BoxFuture<Vec<Log>, Error> {
let include_pending = filter.to_block == Some(BlockNumber::Pending);
let filter: EthcoreFilter = filter.into();
let mut logs = take_weak!(self.client).logs(filter.clone())
let mut logs = take_weakf!(self.client).logs(filter.clone())
.into_iter()
.map(From::from)
.collect::<Vec<Log>>();

if include_pending {
let best_block = take_weak!(self.client).chain_info().best_block_number;
let pending = pending_logs(&*take_weak!(self.miner), best_block, &filter);
let best_block = take_weakf!(self.client).chain_info().best_block_number;
let pending = pending_logs(&*take_weakf!(self.miner), best_block, &filter);
logs.extend(pending);
}

let logs = limit_logs(logs, filter.limit);

Ok(logs)
future::ok(logs).boxed()
}

fn work(&self, no_new_work_timeout: Trailing<u64>) -> Result<Work, Error> {
Expand Down
Loading