Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

feat: stream_with_meta #354

Merged
merged 1 commit into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 2 additions & 37 deletions ethers-contract/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{stream::EventStream, ContractError, EthLogDecode};
use crate::{log::LogMeta, stream::EventStream, ContractError, EthLogDecode};

use ethers_core::{
abi::{Detokenize, RawLog},
types::{Address, BlockNumber, Filter, Log, TxHash, ValueOrArray, H256, U256, U64},
types::{BlockNumber, Filter, Log, ValueOrArray, H256},
};
use ethers_providers::{FilterWatcher, Middleware, PubsubClient, SubscriptionStream};
use std::borrow::Cow;
Expand Down Expand Up @@ -214,38 +214,3 @@ where
.map_err(From::from)
}
}

/// Metadata inside a log
#[derive(Clone, Debug, PartialEq)]
pub struct LogMeta {
/// Address from which this log originated
pub address: Address,

/// The block in which the log was emitted
pub block_number: U64,

/// The block hash in which the log was emitted
pub block_hash: H256,

/// The transaction hash in which the log was emitted
pub transaction_hash: TxHash,

/// Transactions index position log was created from
pub transaction_index: U64,

/// Log index position in the block
pub log_index: U256,
}

impl From<&Log> for LogMeta {
fn from(src: &Log) -> Self {
LogMeta {
address: src.address,
block_number: src.block_number.expect("should have a block number"),
block_hash: src.block_hash.expect("should have a block hash"),
transaction_hash: src.transaction_hash.expect("should have a tx hash"),
transaction_index: src.transaction_index.expect("should have a tx index"),
log_index: src.log_index.expect("should have a log index"),
}
}
}
4 changes: 2 additions & 2 deletions ethers-contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ mod factory;
pub use factory::ContractFactory;

mod event;
pub use event::{EthEvent, LogMeta};
pub use event::EthEvent;

mod log;
pub use log::{decode_logs, EthLogDecode};
pub use log::{decode_logs, EthLogDecode, LogMeta};

mod stream;

Expand Down
36 changes: 36 additions & 0 deletions ethers-contract/src/log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Mod of types for ethereum logs
use ethers_core::abi::Error;
use ethers_core::abi::RawLog;
use ethers_core::types::{Address, Log, TxHash, H256, U256, U64};

/// A trait for types (events) that can be decoded from a `RawLog`
pub trait EthLogDecode: Send + Sync {
Expand All @@ -14,3 +15,38 @@ pub trait EthLogDecode: Send + Sync {
pub fn decode_logs<T: EthLogDecode>(logs: &[RawLog]) -> Result<Vec<T>, Error> {
logs.iter().map(T::decode_log).collect()
}

/// Metadata inside a log
#[derive(Clone, Debug, PartialEq)]
pub struct LogMeta {
/// Address from which this log originated
pub address: Address,

/// The block in which the log was emitted
pub block_number: U64,

/// The block hash in which the log was emitted
pub block_hash: H256,

/// The transaction hash in which the log was emitted
pub transaction_hash: TxHash,

/// Transactions index position log was created from
pub transaction_index: U64,

/// Log index position in the block
pub log_index: U256,
}

impl From<&Log> for LogMeta {
fn from(src: &Log) -> Self {
LogMeta {
address: src.address,
block_number: src.block_number.expect("should have a block number"),
block_hash: src.block_hash.expect("should have a block hash"),
transaction_hash: src.transaction_hash.expect("should have a tx hash"),
transaction_index: src.transaction_index.expect("should have a tx index"),
log_index: src.log_index.expect("should have a log index"),
}
}
}
30 changes: 30 additions & 0 deletions ethers-contract/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::LogMeta;
use ethers_core::types::{Log, U256};
use futures_util::stream::{Stream, StreamExt};
use pin_project::pin_project;
Expand All @@ -19,6 +20,12 @@ pub struct EventStream<'a, T, R, E> {
parse: MapEvent<'a, R, E>,
}

impl<'a, T, R, E> EventStream<'a, T, R, E> {
pub fn with_meta(self) -> EventStreamMeta<'a, T, R, E> {
EventStreamMeta(self)
}
}

impl<'a, T, R, E> EventStream<'a, T, R, E> {
pub fn new(id: U256, stream: T, parse: MapEvent<'a, R, E>) -> Self {
Self { id, stream, parse }
Expand All @@ -39,3 +46,26 @@ where
}
}
}

#[pin_project]
pub struct EventStreamMeta<'a, T, R, E>(pub EventStream<'a, T, R, E>);

impl<'a, T, R, E> Stream for EventStreamMeta<'a, T, R, E>
where
T: Stream<Item = Log> + Unpin,
{
type Item = Result<(R, LogMeta), E>;

fn poll_next(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
match futures_util::ready!(this.0.stream.poll_next_unpin(ctx)) {
Some(item) => {
let meta = LogMeta::from(&item);
let res = (this.0.parse)(item);
let res = res.map(|inner| (inner, meta));
Poll::Ready(Some(res))
}
None => Poll::Pending,
}
}
}
17 changes: 16 additions & 1 deletion ethers-contract/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ mod eth_tests {
let (abi, bytecode) = compile_contract("SimpleStorage", "SimpleStorage.sol");
let ganache = Ganache::new().spawn();
let client = connect(&ganache, 0);
let contract = deploy(client, abi.clone(), bytecode).await;
let contract = deploy(client.clone(), abi.clone(), bytecode).await;

// We spawn the event listener:
let event = contract.event::<ValueChanged>();
Expand All @@ -292,9 +292,13 @@ mod eth_tests {
let mut subscription = event2.subscribe().await.unwrap();
assert_eq!(subscription.id, 2.into());

let mut subscription_meta = event2.subscribe().await.unwrap().with_meta();
assert_eq!(subscription_meta.0.id, 3.into());

let num_calls = 3u64;

// and we make a few calls
let num = client.get_block_number().await.unwrap();
for i in 0..num_calls {
let call = contract
.method::<_, H256>("setValue", i.to_string())
Expand All @@ -307,8 +311,19 @@ mod eth_tests {
// unwrap the option of the stream, then unwrap the decoding result
let log = stream.next().await.unwrap().unwrap();
let log2 = subscription.next().await.unwrap().unwrap();
let (log3, meta) = subscription_meta.next().await.unwrap().unwrap();
assert_eq!(log.new_value, log3.new_value);
assert_eq!(log.new_value, log2.new_value);
assert_eq!(log.new_value, i.to_string());
assert_eq!(meta.block_number, num + i + 1);
let hash = client
.get_block(num + i + 1)
.await
.unwrap()
.unwrap()
.hash
.unwrap();
assert_eq!(meta.block_hash, hash);
}
}

Expand Down