Skip to content
This repository was archived by the owner on Nov 6, 2020. It is now read-only.

Logs Pub-Sub #5705

Merged
merged 6 commits into from
Jun 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion parity/rpc_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,13 @@ impl LightDependencies {
}
},
Api::EthPubSub => {
let client = EthPubSubClient::new(self.client.clone(), self.remote.clone());
let client = EthPubSubClient::light(
self.client.clone(),
self.on_demand.clone(),
self.sync.clone(),
self.cache.clone(),
self.remote.clone(),
);
self.client.add_listener(
Arc::downgrade(&client.handler()) as Weak<::light::client::LightChainNotify>
);
Expand Down
62 changes: 61 additions & 1 deletion rpc/src/v1/helpers/light_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use ethcore::basic_account::BasicAccount;
use ethcore::encoded;
use ethcore::executed::{Executed, ExecutionError};
use ethcore::ids::BlockId;
use ethcore::filter::Filter as EthcoreFilter;
use ethcore::transaction::{Action, Transaction as EthTransaction};

use futures::{future, Future, BoxFuture};
Expand All @@ -38,7 +39,7 @@ use ethsync::LightSync;
use util::{Address, Mutex, U256};

use v1::helpers::{CallRequest as CallRequestHelper, errors, dispatch};
use v1::types::{BlockNumber, CallRequest};
use v1::types::{BlockNumber, CallRequest, Log};

/// Helper for fetching blockchain data either from the light client or the network
/// as necessary.
Expand Down Expand Up @@ -259,4 +260,63 @@ impl LightFetch {
None => future::err(errors::network_disabled()).boxed()
}
}

/// get transaction logs
pub fn logs(&self, filter: EthcoreFilter) -> BoxFuture<Vec<Log>, Error> {
use std::collections::BTreeMap;

use futures::stream::{self, Stream};

const NO_INVALID_BACK_REFS: &'static str = "Fails only on invalid back-references; back-references here known to be valid; qed";

// early exit for "to" block before "from" block.
let best_number = self.client.chain_info().best_block_number;
let block_number = |id| match id {
BlockId::Earliest => Some(0),
BlockId::Latest | BlockId::Pending => Some(best_number),
BlockId::Hash(h) => self.client.block_header(BlockId::Hash(h)).map(|hdr| hdr.number()),
BlockId::Number(x) => Some(x),
};

match (block_number(filter.to_block), block_number(filter.from_block)) {
(Some(to), Some(from)) if to < from => return future::ok(Vec::new()).boxed(),
(Some(_), Some(_)) => {},
_ => return future::err(errors::unknown_block()).boxed(),
}

let maybe_future = self.sync.with_context(move |ctx| {
// find all headers which match the filter, and fetch the receipts for each one.
// match them with their numbers for easy sorting later.
let bit_combos = filter.bloom_possibilities();
let receipts_futures: Vec<_> = self.client.ancestry_iter(filter.to_block)
.take_while(|ref hdr| BlockId::Number(hdr.number()) != filter.from_block)
.take_while(|ref hdr| BlockId::Hash(hdr.hash()) != filter.from_block)
.filter(|ref hdr| {
let hdr_bloom = hdr.log_bloom();
bit_combos.iter().find(|&bloom| hdr_bloom & *bloom == *bloom).is_some()
})
.map(|hdr| (hdr.number(), request::BlockReceipts(hdr.into())))
.map(|(num, req)| self.on_demand.request(ctx, req).expect(NO_INVALID_BACK_REFS).map(move |x| (num, x)))
.collect();

// as the receipts come in, find logs within them which match the filter.
// insert them into a BTreeMap to maintain order by number and block index.
stream::futures_unordered(receipts_futures)
.fold(BTreeMap::new(), move |mut matches, (num, receipts)| {
for (block_index, log) in receipts.into_iter().flat_map(|r| r.logs).enumerate() {
if filter.matches(&log) {
matches.insert((num, block_index), log.into());
}
}
future::ok(matches)
}) // and then collect them into a vector.
.map(|matches| matches.into_iter().map(|(_, v)| v).collect())
.map_err(errors::on_demand_cancel)
});

match maybe_future {
Some(fut) => fut.boxed(),
None => future::err(errors::network_disabled()).boxed(),
}
}
}
13 changes: 12 additions & 1 deletion rpc/src/v1/helpers/subscribers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<T> Subscribers<T> {
}
}

impl <T> Subscribers<Sink<T>> {
impl<T> Subscribers<Sink<T>> {
/// Assigns id and adds a subscriber to the list.
pub fn push(&mut self, sub: Subscriber<T>) {
let id = self.next_id();
Expand All @@ -106,6 +106,17 @@ impl <T> Subscribers<Sink<T>> {
}
}

impl<T, V> Subscribers<(Sink<T>, V)> {
/// Assigns id and adds a subscriber to the list.
pub fn push(&mut self, sub: Subscriber<T>, val: V) {
let id = self.next_id();
if let Ok(sink) = sub.assign_id(SubscriptionId::String(id.as_string())) {
debug!(target: "pubsub", "Adding subscription id={:?}", id);
self.subscriptions.insert(id, (sink, val));
}
}
}

impl<T> ops::Deref for Subscribers<T> {
type Target = HashMap<Id, T>;

Expand Down
175 changes: 140 additions & 35 deletions rpc/src/v1/impls/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,60 @@
use std::sync::Arc;
use std::collections::BTreeMap;

use futures::{self, BoxFuture, Future};
use futures::{self, future, BoxFuture, Future};
use jsonrpc_core::Error;
use jsonrpc_macros::Trailing;
use jsonrpc_macros::pubsub::{Sink, Subscriber};
use jsonrpc_pubsub::SubscriptionId;

use v1::helpers::{errors, Subscribers};
use v1::helpers::{errors, limit_logs, Subscribers};
use v1::helpers::light_fetch::LightFetch;
use v1::metadata::Metadata;
use v1::traits::EthPubSub;
use v1::types::{pubsub, RichHeader};
use v1::types::{pubsub, RichHeader, Log};

use ethcore::encoded;
use ethcore::filter::Filter as EthFilter;
use ethcore::client::{BlockChainClient, ChainNotify, BlockId};
use ethsync::LightSync;
use light::cache::Cache;
use light::on_demand::OnDemand;
use light::client::{LightChainClient, LightChainNotify};
use parity_reactor::Remote;
use util::{Mutex, H256, Bytes};
use util::{RwLock, Mutex, H256, Bytes};

type Client = Sink<pubsub::Result>;

/// Eth PubSub implementation.
pub struct EthPubSubClient<C> {
handler: Arc<ChainNotificationHandler<C>>,
heads_subscribers: Arc<Mutex<Subscribers<Sink<pubsub::Result>>>>,
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
}

impl<C> EthPubSubClient<C> {
/// Creates new `EthPubSubClient`.
pub fn new(client: Arc<C>, remote: Remote) -> Self {
let heads_subscribers = Arc::new(Mutex::new(Subscribers::default()));
let heads_subscribers = Arc::new(RwLock::new(Subscribers::default()));
let logs_subscribers = Arc::new(RwLock::new(Subscribers::default()));
EthPubSubClient {
handler: Arc::new(ChainNotificationHandler {
client,
remote,
heads_subscribers: heads_subscribers.clone(),
logs_subscribers: logs_subscribers.clone(),
}),
heads_subscribers,
logs_subscribers,
}
}

/// Creates new `EthPubSubCient` with deterministic subscription ids.
#[cfg(test)]
pub fn new_test(client: Arc<C>, remote: Remote) -> Self {
let client = Self::new(client, remote);
*client.heads_subscribers.lock() = Subscribers::new_test();
*client.heads_subscribers.write() = Subscribers::new_test();
*client.logs_subscribers.write() = Subscribers::new_test();
client
}

Expand All @@ -70,42 +82,116 @@ impl<C> EthPubSubClient<C> {
}
}

impl EthPubSubClient<LightFetch> {
/// Creates a new `EthPubSubClient` for `LightClient`.
pub fn light(
client: Arc<LightChainClient>,
on_demand: Arc<OnDemand>,
sync: Arc<LightSync>,
cache: Arc<Mutex<Cache>>,
remote: Remote,
) -> Self {
let fetch = LightFetch {
client,
on_demand,
sync,
cache
};
EthPubSubClient::new(Arc::new(fetch), remote)
}
}

/// PubSub Notification handler.
pub struct ChainNotificationHandler<C> {
client: Arc<C>,
remote: Remote,
heads_subscribers: Arc<Mutex<Subscribers<Sink<pubsub::Result>>>>,
heads_subscribers: Arc<RwLock<Subscribers<Client>>>,
logs_subscribers: Arc<RwLock<Subscribers<(Client, EthFilter)>>>,
}

impl<C> ChainNotificationHandler<C> {
fn notify(&self, blocks: Vec<(encoded::Header, BTreeMap<String, String>)>) {
for subscriber in self.heads_subscribers.lock().values() {
for &(ref block, ref extra_info) in &blocks {
self.remote.spawn(subscriber
.notify(Ok(pubsub::Result::Header(RichHeader {
inner: block.into(),
extra_info: extra_info.clone(),
})))
.map(|_| ())
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
);
fn notify(remote: &Remote, subscriber: &Client, result: pubsub::Result) {
remote.spawn(subscriber
.notify(Ok(result))
.map(|_| ())
.map_err(|e| warn!(target: "rpc", "Unable to send notification: {}", e))
);
}

fn notify_heads(&self, headers: &[(encoded::Header, BTreeMap<String, String>)]) {
for subscriber in self.heads_subscribers.read().values() {
for &(ref header, ref extra_info) in headers {
Self::notify(&self.remote, subscriber, pubsub::Result::Header(RichHeader {
inner: header.into(),
extra_info: extra_info.clone(),
}));
}
}
}

fn notify_logs<F>(&self, enacted: &[H256], logs: F) where
F: Fn(EthFilter) -> BoxFuture<Vec<Log>, Error>,
{
for &(ref subscriber, ref filter) in self.logs_subscribers.read().values() {
let logs = futures::future::join_all(enacted
.iter()
.map(|hash| {
let mut filter = filter.clone();
filter.from_block = BlockId::Hash(*hash);
filter.to_block = filter.from_block.clone();
logs(filter)
})
.collect::<Vec<_>>()
);
let limit = filter.limit;
let remote = self.remote.clone();
let subscriber = subscriber.clone();
self.remote.spawn(logs
.map(move |logs| {
let logs = logs.into_iter().flat_map(|log| log).collect();
let logs = limit_logs(logs, limit);
if !logs.is_empty() {
Self::notify(&remote, &subscriber, pubsub::Result::Logs(logs));
}
})
.map_err(|e| warn!("Unable to fetch latest logs: {:?}", e))
);
}
}
}

/// A light client wrapper struct.
pub trait LightClient: Send + Sync {
/// Get a recent block header.
fn block_header(&self, id: BlockId) -> Option<encoded::Header>;

/// Fetch logs.
fn logs(&self, filter: EthFilter) -> BoxFuture<Vec<Log>, Error>;
}

impl LightClient for LightFetch {
fn block_header(&self, id: BlockId) -> Option<encoded::Header> {
self.client.block_header(id)
}

fn logs(&self, filter: EthFilter) -> BoxFuture<Vec<Log>, Error> {
LightFetch::logs(self, filter)
}
}

impl<C: LightChainClient> LightChainNotify for ChainNotificationHandler<C> {
impl<C: LightClient> LightChainNotify for ChainNotificationHandler<C> {
fn new_headers(
&self,
headers: &[H256],
enacted: &[H256],
) {
let blocks = headers
let headers = enacted
.iter()
.filter_map(|hash| self.client.block_header(BlockId::Hash(*hash)))
.map(|header| (header, Default::default()))
.collect();
.collect::<Vec<_>>();

self.notify(blocks);
self.notify_heads(&headers);
self.notify_logs(&enacted, |filter| self.client.logs(filter))
}
}

Expand All @@ -115,22 +201,37 @@ impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
_imported: Vec<H256>,
_invalid: Vec<H256>,
enacted: Vec<H256>,
_retracted: Vec<H256>,
retracted: Vec<H256>,
_sealed: Vec<H256>,
// Block bytes.
_proposed: Vec<Bytes>,
_duration: u64,
) {
const EXTRA_INFO_PROOF: &'static str = "Object exists in in blockchain (fetched earlier), extra_info is always available if object exists; qed";
let blocks = enacted
.into_iter()
.filter_map(|hash| self.client.block_header(BlockId::Hash(hash)))
let headers = enacted
.iter()
.filter_map(|hash| self.client.block_header(BlockId::Hash(*hash)))
.map(|header| {
let hash = header.hash();
(header, self.client.block_extra_info(BlockId::Hash(hash)).expect(EXTRA_INFO_PROOF))
})
.collect();
self.notify(blocks);
.collect::<Vec<_>>();

// Headers
self.notify_heads(&headers);

// Enacted logs
self.notify_logs(&enacted, |filter| {
future::ok(self.client.logs(filter).into_iter().map(Into::into).collect()).boxed()
});

// Retracted logs
self.notify_logs(&retracted, |filter| {
future::ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| {
log.log_type = "removed".into();
log
}).collect()).boxed()
});
}
}

Expand All @@ -144,10 +245,12 @@ impl<C: Send + Sync + 'static> EthPubSub for EthPubSubClient<C> {
kind: pubsub::Kind,
params: Trailing<pubsub::Params>,
) {
let params: Option<pubsub::Params> = params.into();
match (kind, params) {
match (kind, params.into()) {
(pubsub::Kind::NewHeads, None) => {
self.heads_subscribers.lock().push(subscriber)
self.heads_subscribers.write().push(subscriber)
},
(pubsub::Kind::Logs, Some(pubsub::Params::Logs(filter))) => {
self.logs_subscribers.write().push(subscriber, filter.into());
},
_ => {
let _ = subscriber.reject(errors::unimplemented(None));
Expand All @@ -156,7 +259,9 @@ impl<C: Send + Sync + 'static> EthPubSub for EthPubSubClient<C> {
}

fn unsubscribe(&self, id: SubscriptionId) -> BoxFuture<bool, Error> {
let res = self.heads_subscribers.lock().remove(&id).is_some();
futures::future::ok(res).boxed()
let res = self.heads_subscribers.write().remove(&id).is_some();
let res2 = self.logs_subscribers.write().remove(&id).is_some();

future::ok(res || res2).boxed()
}
}
Loading