-
Notifications
You must be signed in to change notification settings - Fork 794
Conversation
{ | ||
// then if we have a TX to broadcast, start | ||
// broadcasting it | ||
if let Some(next_to_broadcast) = this.txns.pop() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we have already broadcast the most escalated tx, we will just wait on that one indefinitely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
completed!(this, Ok(receipt)); | ||
} | ||
// rewake until drained | ||
Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this cx.waker().wake_by_ref()
necessary? Won't executor automatically put the EscalatingPending future back in queue? Other paths here that return Poll::Pending don't call wake
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the goal is to explicitly notify the executor that this is ready to be polled again immediately. i.e. override the executor's scheduling and jump to the front of the queue as much as we have the ability to, as we know that there's no async operation happening to wait on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this section looks like it could be wrapped in a loop over all futures? So that Poll::Ready(Ok(None)) =>
merely results in a continue?
Is the order in which they should be polled here important, if not then Vec::swap_remove
could be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean it kinda is right now, it's just ensuring that it allows the executor to schedule each step of the loop and interleave other work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looping over all futures is an anti-pattern I think, the state machine may be slightly easier to write but as @prestwich says it's better to do it on a per-fut basis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the CheckingReceipts
state essentially a FuturesOrdered
but we only poll one future at a time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some minor qs, love the direction
ethers-providers/src/lib.rs
Outdated
r | ||
}) | ||
.map(|req| async move { | ||
self.sign(req.rlp(chain_id), from).await.map(|sig| req.rlp_signed(chain_id, &sig)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really bad UX? Should we change it to something better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a shortcut function for signing transactions might be good?
broadcast_checks!(cx, this, fut); | ||
} | ||
Sleeping(instant) => { | ||
if instant.elapsed() > *this.polling_interval { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gakonst please check this pattern out, as I am unclear why we use Delay
futures elsewhere. is there some significant behavior difference I'm missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the futures_timer Delay type uses an additional global timerhandle thread that makes sure the delayed task gets woken up at the right time, however, futures_timer hasn't been updated in a while there's also tokio::time::Sleep now, which I guess works similarly but probably uses some Runtime Handle instead.
looking at the Sleeping state, there might be a case were the Future returns Pending and gets not woken up again (elpased < polling_interval)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::time::Sleep
won't work in WASM iirc
I believe all common executors schedule futures for polling in frames regardless of whether the waker has explicitly requested waking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't finish the previous thought before submitting the tx
given that this could cause issues with uncommon executors, should we audit all our futures to ensure that all polls either request rewaking or poll another future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this could even be two different types.
An additional Stream
type that processes all transactions.
The EscalatingPending
Future that polls the underlying stream and returns the first value?
completed!(this, Ok(receipt)); | ||
} | ||
// rewake until drained | ||
Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this section looks like it could be wrapped in a loop over all futures? So that Poll::Ready(Ok(None)) =>
merely results in a continue?
Is the order in which they should be polled here important, if not then Vec::swap_remove
could be useful.
76e79e2
to
a832b48
Compare
What, would be nice is if this could be somehow integrated with the current I believe this would be possible with an additional future type ( this would be a breaking change however. The easiest way would probably be to add another provider function, as is the case now. |
the problem with that is that the signing must be done in the top-level middleware. It can't be done by the provider that the pending transaction has access to. So that feature is blocked by an object-safe |
ah I see, makes sense, this would then be limited to only the SignerMiddleware... |
Not exactly, as the raw provider may sign in many cases (using eth_sign or similar). However, relying on the provider would result in a conflict if any E.g |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, I am OK with placing it as an extra function as-is. I also liked @mattsse's idea of adding an extra type on top of PendingTransaction
so that we can do send_transaction(tx).await?.escalator(...).await?
to escalate and send_transaction(tx).await?.await?
for simple usage
} | ||
|
||
pub fn broadcast_interval(mut self, duration: u64) -> Self { | ||
self.broadcast_interval = Duration::from_secs(duration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use milliseconds here, just for flexbility sake
self.broadcast_interval = Duration::from_secs(duration); | |
self.broadcast_interval = Duration::from_millis(duration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
going to change it to use Into<Duration>
to match the behavior of PendingTransaction
} | ||
|
||
pub fn polling_interval(mut self, duration: u64) -> Self { | ||
self.polling_interval = Duration::from_secs(duration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.polling_interval = Duration::from_secs(duration); | |
self.polling_interval = Duration::from_millis(duration); |
} | ||
|
||
pub fn polling_interval(mut self, duration: u64) -> Self { | ||
self.polling_interval = Duration::from_secs(duration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.polling_interval = Duration::from_secs(duration); | |
self.polling_interval = Duration::from_millis(duration); |
completed!(this, Ok(receipt)); | ||
} | ||
// rewake until drained | ||
Poll::Ready(Ok(None)) => cx.waker().wake_by_ref(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looping over all futures is an anti-pattern I think, the state machine may be slightly easier to write but as @prestwich says it's better to do it on a per-fut basis
latest commit uses |
I have no other outstanding changes so I'll update the changelog and this is ready for consideration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. Thank you for the hard work, we can iterate on it post-merge. @mattsse any further comments?
lgtm |
gg. Should we receive the escalator middleware? |
Same reasoning as gakonst#566
Motivation
Provide a reasonable and working alternative to gas escalator middleware.
Solution
EscalatingPending
that broadcasts a series of transactionsPendingTransaction
but with a simpler (but less informative) state machinesend_escalating
things I don't like
PR Checklist