Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
Prestwich/super pending (#566)
Browse files Browse the repository at this point in the history
* feature: pending_escalator

* feature: send_escalating in Middleware

* bug: don't drop unready futures in escalator.poll

* chore: docs and must_use

* chores: lints, clippies, wasm fixes, dedup pinboxfut

* chore: more lints

* refactor: use Delay in polling interval to ensure re-waking

* refactor: simplify Sleeping state transition as last will never be None again

* bug: properly set last when broadcasts resolve

* feature: debug implementation for EscalatingPending

* refactor: with_ setters and escalations argument

* refactor: use FuturesUnOrdered instead of a vec of futures

* chore: update CHANGELOG with recent PR info

* chore: update all comments on pending escalator

* chore: run rustfmt
  • Loading branch information
prestwich authored Nov 12, 2021
1 parent e9d4012 commit 203b2e8
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
- Use rust types as contract function inputs for human readable abi [#482](https://github.com/gakonst/ethers-rs/pull/482)
- Add EIP-712 `sign_typed_data` signer method; add ethers-core type `Eip712` trait and derive macro in ethers-derive-eip712 [#481](https://github.com/gakonst/ethers-rs/pull/481)
- `LocalWallet::new_keystore` now returns a tuple `(LocalWallet, String)` instead of `LocalWallet`, where the string represents the UUID of the newly created encrypted JSON keystore. The JSON keystore is stored as a file `/dir/uuid`. The issue [#557](https://github.com/gakonst/ethers-rs/issues/557) is addressed [#559](https://github.com/gakonst/ethers-rs/pull/559)
- add the missing constructor for `Timelag` middleware via [#568](https://github.com/gakonst/ethers-rs/pull/568)
- re-export error types for `Http` and `Ws` providers in [#570](https://github.com/gakonst/ethers-rs/pull/570)
- add a method on the `Middleware` to broadcast a tx with a series of escalating gas prices via [#566](https://github.com/gakonst/ethers-rs/pull/566)

### 0.5.3

Expand Down
46 changes: 46 additions & 0 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
//! # }
//! ```
mod transports;
use futures_util::future::join_all;
pub use transports::*;

mod provider;
Expand All @@ -74,6 +75,9 @@ pub mod ens;
mod pending_transaction;
pub use pending_transaction::PendingTransaction;

mod pending_escalator;
pub use pending_escalator::EscalatingPending;

mod stream;
pub use futures_util::StreamExt;
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
Expand All @@ -89,6 +93,9 @@ use std::{error::Error, fmt::Debug, future::Future, pin::Pin, str::FromStr};

pub use provider::{FilterKind, Provider, ProviderError};

/// A simple gas escalation policy
pub type EscalationPolicy = Box<dyn Fn(U256, usize) -> U256 + Send + Sync>;

// Helper type alias
#[cfg(target_arch = "wasm32")]
pub(crate) type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + 'a>>;
Expand Down Expand Up @@ -283,6 +290,45 @@ pub trait Middleware: Sync + Send + Debug {
self.inner().send_transaction(tx, block).await.map_err(FromErr::from)
}

/// Send a transaction with a simple escalation policy.
///
/// `policy` should be a boxed function that maps `original_gas_price`
/// and `number_of_previous_escalations` -> `new_gas_price`.
///
/// e.g. `Box::new(|start, escalation_index| start * 1250.pow(escalations) /
/// 1000.pow(escalations))`
async fn send_escalating<'a>(
&'a self,
tx: &TypedTransaction,
escalations: usize,
policy: EscalationPolicy,
) -> Result<EscalatingPending<'a, Self::Provider>, Self::Error> {
let mut original = tx.clone();
self.fill_transaction(&mut original, None).await?;
let gas_price = original.gas_price().expect("filled");
let chain_id = self.get_chainid().await?.low_u64();
let sign_futs: Vec<_> = (0..escalations)
.map(|i| {
let new_price = policy(gas_price, i);
let mut r = original.clone();
r.set_gas_price(new_price);
r
})
.map(|req| async move {
self.sign(req.rlp(chain_id), &self.default_sender().unwrap_or_default())
.await
.map(|sig| req.rlp_signed(chain_id, &sig))
})
.collect();

// we reverse for convenience. Ensuring that we can always just
// `pop()` the next tx off the back later
let mut signed = join_all(sign_futs).await.into_iter().collect::<Result<Vec<_>, _>>()?;
signed.reverse();

Ok(EscalatingPending::new(self.provider(), signed))
}

async fn resolve_name(&self, ens_name: &str) -> Result<Address, Self::Error> {
self.inner().resolve_name(ens_name).await.map_err(FromErr::from)
}
Expand Down
240 changes: 240 additions & 0 deletions ethers-providers/src/pending_escalator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
use ethers_core::types::{Bytes, TransactionReceipt, H256};
use futures_util::{stream::FuturesUnordered, StreamExt};
use pin_project::pin_project;
use std::{
future::Future,
pin::Pin,
task::Poll,
time::{Duration, Instant},
};

#[cfg(not(target_arch = "wasm32"))]
use futures_timer::Delay;
#[cfg(target_arch = "wasm32")]
use wasm_timer::Delay;

use crate::{JsonRpcClient, Middleware, PendingTransaction, PinBoxFut, Provider, ProviderError};

/// States for the EscalatingPending future
enum EscalatorStates<'a, P> {
Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
Sleeping(Pin<Box<Delay>>),
BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
CheckingReceipts(FuturesUnordered<PinBoxFut<'a, Option<TransactionReceipt>>>),
Completed,
}

/// An EscalatingPending is a pending transaction that increases its own gas
/// price over time, by broadcasting successive versions with higher gas prices.
#[must_use]
#[pin_project(project = PendingProj)]
#[derive(Debug)]
pub struct EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
provider: &'a Provider<P>,
broadcast_interval: Duration,
polling_interval: Duration,
txns: Vec<Bytes>,
last: Instant,
sent: Vec<H256>,
state: EscalatorStates<'a, P>,
}

impl<'a, P> EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
/// Instantiate a new EscalatingPending. This should only be called by the
/// Middleware trait.
///
/// Callers MUST ensure that transactions are in _reverse_ broadcast order
/// (this just makes writing the code easier, as we can use `pop()` a lot).
///
/// TODO: consider deserializing and checking invariants (gas order, etc.)
pub(crate) fn new(provider: &'a Provider<P>, mut txns: Vec<Bytes>) -> Self {
if txns.is_empty() {
panic!("bad args");
}

let first = txns.pop().expect("bad args");
// Sane-feeling default intervals
Self {
provider,
broadcast_interval: Duration::from_millis(150),
polling_interval: Duration::from_millis(10),
txns,
// placeholder value. We set this again after the initial broadcast
// future resolves
last: Instant::now(),
sent: vec![],
state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))),
}
}

/// Set the broadcast interval. This controls how often the escalator
/// broadcasts a new transaction at a higher gas price
pub fn with_broadcast_interval(mut self, duration: impl Into<Duration>) -> Self {
self.broadcast_interval = duration.into();
self
}

/// Set the polling interval. This controls how often the escalator checks
/// transaction receipts for confirmation.
pub fn with_polling_interval(mut self, duration: impl Into<Duration>) -> Self {
self.polling_interval = duration.into();
self
}

/// Get the current polling interval.
pub fn get_polling_interval(&self) -> Duration {
self.polling_interval
}

/// Get the current broadcast interval.
pub fn get_broadcast_interval(&self) -> Duration {
self.broadcast_interval
}
}

macro_rules! check_all_receipts {
($cx:ident, $this:ident) => {
let futs: futures_util::stream::FuturesUnordered<_> = $this
.sent
.iter()
.map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash))
.collect();
*$this.state = CheckingReceipts(futs);
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! sleep {
($cx:ident, $this:ident) => {
*$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
$cx.waker().wake_by_ref();
return Poll::Pending
};
}

macro_rules! completed {
($this:ident, $output:expr) => {
*$this.state = Completed;
return Poll::Ready($output)
};
}

macro_rules! poll_broadcast_fut {
($cx:ident, $this:ident, $fut:ident) => {
match $fut.as_mut().poll($cx) {
Poll::Ready(Ok(pending)) => {
*$this.last = Instant::now();
$this.sent.push(*pending);
tracing::info!(
tx_hash = ?*pending,
escalation = $this.sent.len(),
"Escalation transaction broadcast complete"
);
check_all_receipts!($cx, $this);
}
Poll::Ready(Err(e)) => {
tracing::error!(
error = ?e,
"Error during transaction broadcast"
);
completed!($this, Err(e));
}
Poll::Pending => return Poll::Pending,
}
};
}

impl<'a, P> Future for EscalatingPending<'a, P>
where
P: JsonRpcClient,
{
type Output = Result<TransactionReceipt, ProviderError>;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
use EscalatorStates::*;

let this = self.project();

match this.state {
// In the initial state we're simply waiting on the first
// transaction braodcast to complete.
Initial(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
Sleeping(delay) => {
let _ready = futures_util::ready!(delay.as_mut().poll(cx));
// if broadcast timer has elapsed and if we have a TX to
// broadcast, broadcast it
if this.last.elapsed() > *this.broadcast_interval {
if let Some(next_to_broadcast) = this.txns.pop() {
let fut = this.provider.send_raw_transaction(next_to_broadcast);
*this.state = BroadcastingNew(fut);
cx.waker().wake_by_ref();
return Poll::Pending
}
}
check_all_receipts!(cx, this);
}
// This state is functionally equivalent to Initial, but we
// differentiate it for clarity
BroadcastingNew(fut) => {
poll_broadcast_fut!(cx, this, fut);
}
CheckingReceipts(futs) => {
// Poll the set of `get_transaction_receipt` futures to check
// if any previously-broadcast transaction was confirmed.
// Continue doing this until all are resolved
match futs.poll_next_unpin(cx) {
// We have found a receipt. This means that all other
// broadcast txns are now invalid, so we can drop the
// futures and complete
Poll::Ready(Some(Ok(Some(receipt)))) => {
completed!(this, Ok(receipt));
}
// A `get_transaction_receipt` request resolved, but but we
// found no receipt, rewake and check if any other requests
// are resolved
Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(),
// A request errored. We complete the future with the error.
Poll::Ready(Some(Err(e))) => {
completed!(this, Err(e));
}
// We have run out of `get_transaction_receipt` requests.
// Sleep and then check if we should broadcast again (or
// check receipts again)
Poll::Ready(None) => {
sleep!(cx, this);
}
// No request has resolved yet. Try again later
Poll::Pending => return Poll::Pending,
}
}
Completed => panic!("polled after completion"),
}

Poll::Pending
}
}

impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = match self {
Self::Initial(_) => "Initial",
Self::Sleeping(_) => "Sleeping",
Self::BroadcastingNew(_) => "BroadcastingNew",
Self::CheckingReceipts(_) => "CheckingReceipts",
Self::Completed => "Completed",
};
f.debug_struct("EscalatorStates").field("state", &state).finish()
}
}
1 change: 1 addition & 0 deletions ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
use crate::CeloMiddleware;
use crate::Middleware;
use async_trait::async_trait;

use ethers_core::{
abi::{self, Detokenize, ParamType},
types::{
Expand Down

0 comments on commit 203b2e8

Please sign in to comment.