-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chainntnfs: handle historical confs and spends asynchronously #1628
Conversation
Looks to have PR-specific failures on the itests rn. |
} | ||
} | ||
|
||
return fmt.Errorf("spending transaction not found within block range "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem like this should be printed as an error? Can potentially make this a concrete error type we can filter out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -867,69 +867,14 @@ func testSpendBeforeNtfnRegistration(miner *rpctest.Harness, | |||
// concrete implementations. | |||
// | |||
// To do so, we first create a new output to our test target address. | |||
txid, err := getTestTxId(miner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
msg.heightHint) | ||
// Look up whether the transaction is already | ||
// included in the active chain. | ||
confDetails, err := n.historicalConfDetails( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
// the confirmation details must be provided with the UpdateConfDetails method, | ||
// otherwise we will wait for the transaction to confirm even though it already | ||
// has. | ||
func (tcn *TxConfNotifier) Register(ntfn *ConfNtfn) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commit message says "registrations for unconfirmed transactions only", but seems like this will also be called fro already confirmed transactions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified things a bit, let me know what you think.
@@ -245,24 +245,37 @@ out: | |||
_, currentHeight, err := b.chainConn.GetBestBlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this could be moved inside the go routine
chainntnfs/txconfnotifier.go
Outdated
// indicating that the transaction has already been | ||
// confirmed. | ||
select { | ||
case ntfn.Event.Updates <- 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we risk the channel having a buffer one element to small in this case? (since it is created with numconfs
capacity)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safe to do so as we don't send an update indicating the required number of confirmations. Now that I think about this though, it should do that in place of sending an update indicating there aren't any confirmations left as that's already handled by the Confirmed
channel. Will address it in a follow-up PR.
chainntnfs/txconfnotifier.go
Outdated
if details.BlockHeight <= tcn.currentHeight { | ||
numConfsLeft := confHeight - tcn.currentHeight | ||
select { | ||
case ntfn.Event.Updates <- numConfsLeft: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there might be an issue here if multiple callers register for confirmation of the same txid. Each time UpdateConfDetails
is called as a result of this, all clients will be sent updates? This will lead to each client receiving the updates multiple times, and the channel might even get full.
From what I can gather, we shouldn't need to iterate the ntfn clients in this method, but pass in the one client that requested the historical dispatch, and only notify that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I spent quite some time debugging this. Ended up adding an id
so that we can uniquely track each notification. This is something that will be needed anyway once we support canceling confirmation notifications.
chainntnfs/btcdnotify/btcd.go
Outdated
@@ -800,6 +801,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, | |||
|
|||
ntfn := &confirmationNotification{ | |||
ConfNtfn: chainntnfs.ConfNtfn{ | |||
ID: atomic.AddUint64(&b.confClientCounter, 1), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confID
for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
chainntnfs/txconfnotifier.go
Outdated
return nil | ||
} | ||
|
||
// UpdateConfDetails attempts to mark an existing unconfirmed transaction as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't make this godoc match what the method is actually doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot to update it. Fixed.
// included in the active chain. We'll do this | ||
// in a goroutine to prevent blocking | ||
// potentially long rescans. | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe introduce a waitgroup to wait for during shutdowns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's really needed here 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't think it's needed here. Otherwise a very distant rescan can cause the shutdown process to be held up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could thread a quit chan down to the rescan if we want. thinking it might be a good idea actually, to prevent the rescan from causing an accidental panic after the chain conns are deinitialized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a nice pattern to follow, as also test failures tend to be harder to trace if there are stray go routines remaining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
} | ||
if confDetails != nil { | ||
err := b.txConfNotifier.UpdateConfDetails( | ||
*msg.TxID, msg.ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just pass the msg.ConfNtfn
itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to keep it as is to make sure the notification has been registered with the notifier first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if moving the Register
call right before, that would make those call more tightly coupled, and i think even the whole ConfID
can be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but then it's expected for the caller to know this, rather than just providing an interface that ensures the notification has been registered first. Having an ID will also come in handy when adding support for cancelling existing notifications (there's an existing TODO for this).
rescanUpdate := []neutrino.UpdateOption{ | ||
neutrino.AddTxIDs(*msg.TxID), | ||
neutrino.Rewind(currentHeight), | ||
} | ||
if err := n.chainView.Update(rescanUpdate...); err != nil { | ||
chainntnfs.Log.Errorf("unable to update rescan: %v", err) | ||
err = n.chainView.Update(rescanUpdate...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a reminder to make sure we cannot potentially miss a confimation if a block comes in while this go routine is running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by this @halseth ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are passing in currentHeight
, which might be outdated at this point (because of potential long-running historicalConfDetails
). I think this should be okay, but just meant that we should check that it indeed is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it's likely we'd rescan all of the manual dispatch done before. Since current height is behind a mutex, should be safe to read again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentHeight
is a copy so it should remain unchanged even if the actual height changes.
chainntnfs/btcdnotify/btcd.go
Outdated
// included in the active chain. We'll do this | ||
// in a goroutine to prevent blocking | ||
// potentially long rescans. | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there might be a potential race here:
Imagine tx A is confirmed in block 101, and our currentHeight
is 104. We want a notification on 5 confirmations.
[100] <- [101: A] <- [102] <- [103] <- [104]
We start a rescan from heightHint = 100
, to 104. While we do this block 105 comes in. In ConnectTip
we check ntfn.details == nil
and skip it. The rescan finishes, ntfn.details
is set, and the notification for block 105 is never sent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still thinking about how to best solve this without essentially ending back up at our sync rescan behaviour. One potential approach would be to always do a rescan from heightHint
to bestHeight
without registering the notification client first (that way new blocks can come in without altering our client), and then keep doing this till you know for sure that you have scanned all they way to the current best height (can be enforced with a mutex). Then you register the client (within the same scope of the mutex), making sure no block can come in in the meantime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was probably wrong, looks like the tcn.Lock
acquired in UpdateConfDetails
makes sure that no new block gets processed before we are done with the notifying, so we should be good 👍
TxID: txid, | ||
NumConfirmations: numConfs, | ||
Event: chainntnfs.NewConfirmationEvent(numConfs), | ||
}, | ||
heightHint: heightHint, | ||
} | ||
|
||
if err := n.txConfNotifier.Register(&ntfn.ConfNtfn); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just move this call to where we are handling the confirmationsNotification
, before we start the historical dispatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but I figured it made more sense having it here like we do with NotifySpent
in RegisterSpendNtfn
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For readability I think it would make sense to move it there, as they always need to be called in that order, right after one another, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Register
must now be called first. Since it's possible that it fails, we should return the error to the caller rather than just logging it, as otherwise the caller will be expecting to receive the confirmation details for a failed registration. At the moment, it's unlikely for it to return an error other than when shutting down, but it could prevent an obscure bug later down the road.
|
||
if confDetails != nil { | ||
err := n.txConfNotifier.UpdateConfDetails( | ||
*msg.TxID, msg.ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if we actually need the new ID
if we can just pass the ConfNtfn
in here instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above w.r.t. ensuring the notification has been registered first.
chainntnfs/txconfnotifier.go
Outdated
return nil | ||
} | ||
|
||
// The notifier has yet to reach the height at which the transaction was |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit confusing, how would we ever know in advance if the conf intent is already satisfied? Seems like it should read DispatchBlockHeight
here instead of just a block height.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's the block height that the transaction was initially confirmed at, why can we determine this independent of the notifier itself advancing? Do we have a test to exercise this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason behind the check is due to the lag between the TxConfNotifier
and the backend. For example, let's say that the notifier starts at height 100 and the the transaction confirms at 102. The backend moves up to 102, but the TxConfNotifier
is still processing height 101. In the event that UpdateConfDetails
is called before ConnectTip(..., 102, ...)
, the notifier isn't yet aware of this height, so we defer until handling the notification dispatch until then.
Not sure what you mean by DispatchBlockHeight
. Are you referring to renaming the method UpdateConfDetails
to that?
// included in the active chain. We'll do this | ||
// in a goroutine to prevent blocking | ||
// potentially long rescans. | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't think it's needed here. Otherwise a very distant rescan can cause the shutdown process to be held up.
rescanUpdate := []neutrino.UpdateOption{ | ||
neutrino.AddTxIDs(*msg.TxID), | ||
neutrino.Rewind(currentHeight), | ||
} | ||
if err := n.chainView.Update(rescanUpdate...); err != nil { | ||
chainntnfs.Log.Errorf("unable to update rescan: %v", err) | ||
err = n.chainView.Update(rescanUpdate...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by this @halseth ?
@@ -237,7 +241,7 @@ out: | |||
b.spendNotifications[op] = make(map[uint64]*spendNotification) | |||
} | |||
b.spendNotifications[op][msg.spendID] = msg | |||
b.chainConn.NotifySpent([]*wire.OutPoint{&op}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this was a duplicate call even? We already do this at notification registration site.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested this on my testnet nodes with 50+ channels. Before this commit, the start up time was measured in the minutes (time from startup to responding to getinfo
). After this commit, the start up time is measured in seconds. The one lingering issue which we've discussed offline, is the fact that the Start()
method is blocking which can cause contention with the main server mutex. Once that final issue is patched, restarts will be much, much speedier.
LGTM 🦑
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job @wpaulino, looks pretty complete to me!
started int32 // To be used atomically. | ||
stopped int32 // To be used atomically. | ||
|
||
confClientCounter uint64 // To be used atomically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recommend putting 64-bit atomic vars before 32-bit ones, as this pattern always produces valid alignments on 32-bit machines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
chainntnfs/btcdnotify/btcd.go
Outdated
started int32 // To be used atomically. | ||
stopped int32 // To be used atomically. | ||
|
||
confClientCounter uint64 // To be used aotmically. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here about moving 64-bit vals before 32's, s/aotmically/atomically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
chainntnfs/txconfnotifier.go
Outdated
// The notifier has yet to reach the height at which the transaction was | ||
// included in a block, so we should defer until handling it then within | ||
// ConnectTip. | ||
if details.BlockHeight > tcn.currentHeight { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line will panic if details
is nil, which is possible as written. Might want to add a test for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
var ( | ||
// ErrTxConfNotifierExiting is an error returned when attempting to | ||
// interact with the TxConfNotifier but it been shut down. | ||
ErrTxConfNotifierExiting = errors.New("TxConfNotifier is exiting") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// included in the active chain. We'll do this | ||
// in a goroutine to prevent blocking | ||
// potentially long rescans. | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could thread a quit chan down to the rescan if we want. thinking it might be a good idea actually, to prevent the rescan from causing an accidental panic after the chain conns are deinitialized
chainntnfs/txconfnotifier.go
Outdated
return nil | ||
} | ||
|
||
// UpdateConfDetails attempts to update the confirmation details for the an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
rescanUpdate := []neutrino.UpdateOption{ | ||
neutrino.AddTxIDs(*msg.TxID), | ||
neutrino.Rewind(currentHeight), | ||
} | ||
if err := n.chainView.Update(rescanUpdate...); err != nil { | ||
chainntnfs.Log.Errorf("unable to update rescan: %v", err) | ||
err = n.chainView.Update(rescanUpdate...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are passing in currentHeight
, which might be outdated at this point (because of potential long-running historicalConfDetails
). I think this should be okay, but just meant that we should check that it indeed is.
Mostly LGTM now (after addressing Conner's comments). I still think the |
… async In this commit, we modify our TxConfNotifier struct to allow handling notification registrations asynchronously. The Register method has been refactored into two: Register and UpdateConfDetails. In the case that a transaction we registered for notifications on has already confirmed, we'll need to determine its confirmation details on our own. Once done, this can be provided within UpdateConfDetails. This change will pave down the road for our different chain notifiers to handle potentially long rescans asynchronously to prevent blocking the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 💣
In this PR, we modify our
bitcoind
andbtcd
chain notifiers to handle historical confirmations and spends asynchronously. This was motivated by users experiencing slow startups due to blocking during long rescans.Fixes #1569.
Fixes #1616.
Fixes #1648.