-
Notifications
You must be signed in to change notification settings - Fork 601
/
Copy pathwallet.go
4107 lines (3618 loc) · 125 KB
/
wallet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2013-2017 The btcsuite developers
// Copyright (c) 2015-2016 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package wallet
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/btcutil/hdkeychain"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain"
"github.com/btcsuite/btcwallet/waddrmgr"
"github.com/btcsuite/btcwallet/wallet/txauthor"
"github.com/btcsuite/btcwallet/wallet/txrules"
"github.com/btcsuite/btcwallet/walletdb"
"github.com/btcsuite/btcwallet/walletdb/migration"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/davecgh/go-spew/spew"
)
const (
// InsecurePubPassphrase is the default outer encryption passphrase used
// for public data (everything but private keys). Using a non-default
// public passphrase can prevent an attacker without the public
// passphrase from discovering all past and future wallet addresses if
// they gain access to the wallet database.
//
// NOTE: at time of writing, public encryption only applies to public
// data in the waddrmgr namespace. Transactions are not yet encrypted.
InsecurePubPassphrase = "public"
// recoveryBatchSize is the default number of blocks that will be
// scanned successively by the recovery manager, in the event that the
// wallet is started in recovery mode.
recoveryBatchSize = 2000
// defaultSyncRetryInterval is the default amount of time to wait
// between re-tries on errors during initial sync.
defaultSyncRetryInterval = 5 * time.Second
)
var (
// ErrNotSynced describes an error where an operation cannot complete
// due wallet being out of sync (and perhaps currently syncing with)
// the remote chain server.
ErrNotSynced = errors.New("wallet is not synchronized with the chain server")
// ErrWalletShuttingDown is an error returned when we attempt to make a
// request to the wallet but it is in the process of or has already shut
// down.
ErrWalletShuttingDown = errors.New("wallet shutting down")
// ErrUnknownTransaction is returned when an attempt is made to label
// a transaction that is not known to the wallet.
ErrUnknownTransaction = errors.New("cannot label transaction not " +
"known to wallet")
// ErrTxLabelExists is returned when a transaction already has a label
// and an attempt has been made to label it without setting overwrite
// to true.
ErrTxLabelExists = errors.New("transaction already labelled")
// ErrNoTx is returned when a transaction can not be found.
ErrNoTx = errors.New("can not find transaction")
// ErrTxUnsigned is returned when a transaction is created in the
// watch-only mode where we can select coins but not sign any inputs.
ErrTxUnsigned = errors.New("watch-only wallet, transaction not signed")
// Namespace bucket keys.
waddrmgrNamespaceKey = []byte("waddrmgr")
wtxmgrNamespaceKey = []byte("wtxmgr")
)
// Coin represents a spendable UTXO which is available for coin selection.
type Coin struct {
wire.TxOut
wire.OutPoint
}
// CoinSelectionStrategy is an interface that represents a coin selection
// strategy. A coin selection strategy is responsible for ordering, shuffling or
// filtering a list of coins before they are passed to the coin selection
// algorithm.
type CoinSelectionStrategy interface {
// ArrangeCoins takes a list of coins and arranges them according to the
// specified coin selection strategy and fee rate.
ArrangeCoins(eligible []Coin, feeSatPerKb btcutil.Amount) ([]Coin,
error)
}
var (
// CoinSelectionLargest always picks the largest available utxo to add
// to the transaction next.
CoinSelectionLargest CoinSelectionStrategy = &LargestFirstCoinSelector{}
// CoinSelectionRandom randomly selects the next utxo to add to the
// transaction. This strategy prevents the creation of ever smaller
// utxos over time.
CoinSelectionRandom CoinSelectionStrategy = &RandomCoinSelector{}
)
// Wallet is a structure containing all the components for a
// complete wallet. It contains the Armory-style key store
// addresses and keys),
type Wallet struct {
publicPassphrase []byte
// Data stores
db walletdb.DB
Manager *waddrmgr.Manager
TxStore *wtxmgr.Store
chainClient chain.Interface
chainClientLock sync.Mutex
chainClientSynced bool
chainClientSyncMtx sync.Mutex
newAddrMtx sync.Mutex
lockedOutpoints map[wire.OutPoint]struct{}
lockedOutpointsMtx sync.Mutex
recovering atomic.Value
recoveryWindow uint32
// Channels for rescan processing. Requests are added and merged with
// any waiting requests, before being sent to another goroutine to
// call the rescan RPC.
rescanAddJob chan *RescanJob
rescanBatch chan *rescanBatch
rescanNotifications chan interface{} // From chain server
rescanProgress chan *RescanProgressMsg
rescanFinished chan *RescanFinishedMsg
// Channel for transaction creation requests.
createTxRequests chan createTxRequest
// Channels for the manager locker.
unlockRequests chan unlockRequest
lockRequests chan struct{}
holdUnlockRequests chan chan heldUnlock
lockState chan bool
changePassphrase chan changePassphraseRequest
changePassphrases chan changePassphrasesRequest
NtfnServer *NotificationServer
chainParams *chaincfg.Params
wg sync.WaitGroup
started bool
quit chan struct{}
quitMu sync.Mutex
// syncRetryInterval is the amount of time to wait between re-tries on
// errors during initial sync.
syncRetryInterval time.Duration
}
// Start starts the goroutines necessary to manage a wallet.
func (w *Wallet) Start() {
w.quitMu.Lock()
select {
case <-w.quit:
// Restart the wallet goroutines after shutdown finishes.
w.WaitForShutdown()
w.quit = make(chan struct{})
default:
// Ignore when the wallet is still running.
if w.started {
w.quitMu.Unlock()
return
}
w.started = true
}
w.quitMu.Unlock()
w.wg.Add(2)
go w.txCreator()
go w.walletLocker()
}
// SynchronizeRPC associates the wallet with the consensus RPC client,
// synchronizes the wallet with the latest changes to the blockchain, and
// continuously updates the wallet through RPC notifications.
//
// This method is unstable and will be removed when all syncing logic is moved
// outside of the wallet package.
func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) {
w.quitMu.Lock()
select {
case <-w.quit:
w.quitMu.Unlock()
return
default:
}
w.quitMu.Unlock()
// TODO: Ignoring the new client when one is already set breaks callers
// who are replacing the client, perhaps after a disconnect.
w.chainClientLock.Lock()
if w.chainClient != nil {
w.chainClientLock.Unlock()
return
}
w.chainClient = chainClient
// If the chain client is a NeutrinoClient instance, set a birthday so
// we don't download all the filters as we go.
switch cc := chainClient.(type) {
case *chain.NeutrinoClient:
cc.SetStartTime(w.Manager.Birthday())
case *chain.BitcoindClient:
cc.SetBirthday(w.Manager.Birthday())
}
w.chainClientLock.Unlock()
// TODO: It would be preferable to either run these goroutines
// separately from the wallet (use wallet mutator functions to
// make changes from the RPC client) and not have to stop and
// restart them each time the client disconnects and reconnets.
w.wg.Add(4)
go w.handleChainNotifications()
go w.rescanBatchHandler()
go w.rescanProgressHandler()
go w.rescanRPCHandler()
}
// requireChainClient marks that a wallet method can only be completed when the
// consensus RPC server is set. This function and all functions that call it
// are unstable and will need to be moved when the syncing code is moved out of
// the wallet.
func (w *Wallet) requireChainClient() (chain.Interface, error) {
w.chainClientLock.Lock()
chainClient := w.chainClient
w.chainClientLock.Unlock()
if chainClient == nil {
return nil, errors.New("blockchain RPC is inactive")
}
return chainClient, nil
}
// ChainClient returns the optional consensus RPC client associated with the
// wallet.
//
// This function is unstable and will be removed once sync logic is moved out of
// the wallet.
func (w *Wallet) ChainClient() chain.Interface {
w.chainClientLock.Lock()
chainClient := w.chainClient
w.chainClientLock.Unlock()
return chainClient
}
// quitChan atomically reads the quit channel.
func (w *Wallet) quitChan() <-chan struct{} {
w.quitMu.Lock()
c := w.quit
w.quitMu.Unlock()
return c
}
// Stop signals all wallet goroutines to shutdown.
func (w *Wallet) Stop() {
w.quitMu.Lock()
quit := w.quit
w.quitMu.Unlock()
select {
case <-quit:
default:
close(quit)
w.chainClientLock.Lock()
if w.chainClient != nil {
w.chainClient.Stop()
w.chainClient = nil
}
w.chainClientLock.Unlock()
}
}
// ShuttingDown returns whether the wallet is currently in the process of
// shutting down or not.
func (w *Wallet) ShuttingDown() bool {
select {
case <-w.quitChan():
return true
default:
return false
}
}
// WaitForShutdown blocks until all wallet goroutines have finished executing.
func (w *Wallet) WaitForShutdown() {
w.chainClientLock.Lock()
if w.chainClient != nil {
w.chainClient.WaitForShutdown()
}
w.chainClientLock.Unlock()
w.wg.Wait()
}
// SynchronizingToNetwork returns whether the wallet is currently synchronizing
// with the Bitcoin network.
func (w *Wallet) SynchronizingToNetwork() bool {
// At the moment, RPC is the only synchronization method. In the
// future, when SPV is added, a separate check will also be needed, or
// SPV could always be enabled if RPC was not explicitly specified when
// creating the wallet.
w.chainClientSyncMtx.Lock()
syncing := w.chainClient != nil
w.chainClientSyncMtx.Unlock()
return syncing
}
// ChainSynced returns whether the wallet has been attached to a chain server
// and synced up to the best block on the main chain.
func (w *Wallet) ChainSynced() bool {
w.chainClientSyncMtx.Lock()
synced := w.chainClientSynced
w.chainClientSyncMtx.Unlock()
return synced
}
// SetChainSynced marks whether the wallet is connected to and currently in sync
// with the latest block notified by the chain server.
//
// NOTE: Due to an API limitation with rpcclient, this may return true after
// the client disconnected (and is attempting a reconnect). This will be unknown
// until the reconnect notification is received, at which point the wallet can be
// marked out of sync again until after the next rescan completes.
func (w *Wallet) SetChainSynced(synced bool) {
w.chainClientSyncMtx.Lock()
w.chainClientSynced = synced
w.chainClientSyncMtx.Unlock()
}
// activeData returns the currently-active receiving addresses and all unspent
// outputs. This is primarely intended to provide the parameters for a
// rescan request.
func (w *Wallet) activeData(dbtx walletdb.ReadWriteTx) ([]btcutil.Address, []wtxmgr.Credit, error) {
addrmgrNs := dbtx.ReadBucket(waddrmgrNamespaceKey)
txmgrNs := dbtx.ReadWriteBucket(wtxmgrNamespaceKey)
var addrs []btcutil.Address
err := w.Manager.ForEachRelevantActiveAddress(
addrmgrNs, func(addr btcutil.Address) error {
addrs = append(addrs, addr)
return nil
},
)
if err != nil {
return nil, nil, err
}
// Before requesting the list of spendable UTXOs, we'll delete any
// expired output locks.
err = w.TxStore.DeleteExpiredLockedOutputs(
dbtx.ReadWriteBucket(wtxmgrNamespaceKey),
)
if err != nil {
return nil, nil, err
}
unspent, err := w.TxStore.UnspentOutputs(txmgrNs)
return addrs, unspent, err
}
// syncWithChain brings the wallet up to date with the current chain server
// connection. It creates a rescan request and blocks until the rescan has
// finished. The birthday block can be passed in, if set, to ensure we can
// properly detect if it gets rolled back.
func (w *Wallet) syncWithChain(birthdayStamp *waddrmgr.BlockStamp) error {
chainClient, err := w.requireChainClient()
if err != nil {
return err
}
// Neutrino relies on the information given to it by the cfheader server
// so it knows exactly whether it's synced up to the server's state or
// not, even on dev chains. To recover a Neutrino wallet, we need to
// make sure it's synced before we start scanning for addresses,
// otherwise we might miss some if we only scan up to its current sync
// point.
neutrinoRecovery := chainClient.BackEnd() == "neutrino" &&
w.recoveryWindow > 0
// We'll wait until the backend is synced to ensure we get the latest
// MaxReorgDepth blocks to store. We don't do this for development
// environments as we can't guarantee a lively chain, except for
// Neutrino, where the cfheader server tells us what it believes the
// chain tip is.
if !w.isDevEnv() || neutrinoRecovery {
log.Debug("Waiting for chain backend to sync to tip")
if err := w.waitUntilBackendSynced(chainClient); err != nil {
return err
}
log.Debug("Chain backend synced to tip!")
}
// If we've yet to find our birthday block, we'll do so now.
if birthdayStamp == nil {
var err error
birthdayStamp, err = locateBirthdayBlock(
chainClient, w.Manager.Birthday(),
)
if err != nil {
return fmt.Errorf("unable to locate birthday block: %w",
err)
}
// We'll also determine our initial sync starting height. This
// is needed as the wallet can now begin storing blocks from an
// arbitrary height, rather than all the blocks from genesis, so
// we persist this height to ensure we don't store any blocks
// before it.
startHeight := birthdayStamp.Height
// With the starting height obtained, get the remaining block
// details required by the wallet.
startHash, err := chainClient.GetBlockHash(int64(startHeight))
if err != nil {
return err
}
startHeader, err := chainClient.GetBlockHeader(startHash)
if err != nil {
return err
}
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
err := w.Manager.SetSyncedTo(ns, &waddrmgr.BlockStamp{
Hash: *startHash,
Height: startHeight,
Timestamp: startHeader.Timestamp,
})
if err != nil {
return err
}
return w.Manager.SetBirthdayBlock(ns, *birthdayStamp, true)
})
if err != nil {
return fmt.Errorf("unable to persist initial sync "+
"data: %w", err)
}
}
// If the wallet requested an on-chain recovery of its funds, we'll do
// so now.
if w.recoveryWindow > 0 {
if err := w.recovery(chainClient, birthdayStamp); err != nil {
return fmt.Errorf("unable to perform wallet recovery: "+
"%w", err)
}
}
// Compare previously-seen blocks against the current chain. If any of
// these blocks no longer exist, rollback all of the missing blocks
// before catching up with the rescan.
rollback := false
rollbackStamp := w.Manager.SyncedTo()
err = walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
addrmgrNs := tx.ReadWriteBucket(waddrmgrNamespaceKey)
txmgrNs := tx.ReadWriteBucket(wtxmgrNamespaceKey)
for height := rollbackStamp.Height; true; height-- {
hash, err := w.Manager.BlockHash(addrmgrNs, height)
if err != nil {
return err
}
chainHash, err := chainClient.GetBlockHash(int64(height))
if err != nil {
return err
}
header, err := chainClient.GetBlockHeader(chainHash)
if err != nil {
return err
}
rollbackStamp.Hash = *chainHash
rollbackStamp.Height = height
rollbackStamp.Timestamp = header.Timestamp
if bytes.Equal(hash[:], chainHash[:]) {
break
}
rollback = true
}
// If a rollback did not happen, we can proceed safely.
if !rollback {
return nil
}
// Otherwise, we'll mark this as our new synced height.
err := w.Manager.SetSyncedTo(addrmgrNs, &rollbackStamp)
if err != nil {
return err
}
// If the rollback happened to go beyond our birthday stamp,
// we'll need to find a new one by syncing with the chain again
// until finding one.
if rollbackStamp.Height <= birthdayStamp.Height &&
rollbackStamp.Hash != birthdayStamp.Hash {
err := w.Manager.SetBirthdayBlock(
addrmgrNs, rollbackStamp, true,
)
if err != nil {
return err
}
}
// Finally, we'll roll back our transaction store to reflect the
// stale state. `Rollback` unconfirms transactions at and beyond
// the passed height, so add one to the new synced-to height to
// prevent unconfirming transactions in the synced-to block.
return w.TxStore.Rollback(txmgrNs, rollbackStamp.Height+1)
})
if err != nil {
return err
}
// Request notifications for connected and disconnected blocks.
//
// TODO(jrick): Either request this notification only once, or when
// rpcclient is modified to allow some notification request to not
// automatically resent on reconnect, include the notifyblocks request
// as well. I am leaning towards allowing off all rpcclient
// notification re-registrations, in which case the code here should be
// left as is.
if err := chainClient.NotifyBlocks(); err != nil {
return err
}
// Finally, we'll trigger a wallet rescan and request notifications for
// transactions sending to all wallet addresses and spending all wallet
// UTXOs.
var (
addrs []btcutil.Address
unspent []wtxmgr.Credit
)
err = walletdb.Update(w.db, func(dbtx walletdb.ReadWriteTx) error {
addrs, unspent, err = w.activeData(dbtx)
return err
})
if err != nil {
return err
}
return w.rescanWithTarget(addrs, unspent, nil)
}
// isDevEnv determines whether the wallet is currently under a local developer
// environment, e.g. simnet or regtest.
func (w *Wallet) isDevEnv() bool {
switch uint32(w.ChainParams().Net) {
case uint32(chaincfg.RegressionNetParams.Net):
case uint32(chaincfg.SimNetParams.Net):
default:
return false
}
return true
}
// waitUntilBackendSynced blocks until the chain backend considers itself
// "current".
func (w *Wallet) waitUntilBackendSynced(chainClient chain.Interface) error {
// We'll poll every second to determine if our chain considers itself
// "current".
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-t.C:
if chainClient.IsCurrent() {
return nil
}
case <-w.quitChan():
return ErrWalletShuttingDown
}
}
}
// locateBirthdayBlock returns a block that meets the given birthday timestamp
// by a margin of +/-2 hours. This is safe to do as the timestamp is already 2
// days in the past of the actual timestamp.
func locateBirthdayBlock(chainClient chainConn,
birthday time.Time) (*waddrmgr.BlockStamp, error) {
// Retrieve the lookup range for our block.
startHeight := int32(0)
_, bestHeight, err := chainClient.GetBestBlock()
if err != nil {
return nil, err
}
log.Debugf("Locating suitable block for birthday %v between blocks "+
"%v-%v", birthday, startHeight, bestHeight)
var (
birthdayBlock *waddrmgr.BlockStamp
left, right = startHeight, bestHeight
)
// Binary search for a block that meets the birthday timestamp by a
// margin of +/-2 hours.
for {
// Retrieve the timestamp for the block halfway through our
// range.
mid := left + (right-left)/2
hash, err := chainClient.GetBlockHash(int64(mid))
if err != nil {
return nil, err
}
header, err := chainClient.GetBlockHeader(hash)
if err != nil {
return nil, err
}
log.Debugf("Checking candidate block: height=%v, hash=%v, "+
"timestamp=%v", mid, hash, header.Timestamp)
// If the search happened to reach either of our range extremes,
// then we'll just use that as there's nothing left to search.
if mid == startHeight || mid == bestHeight || mid == left {
birthdayBlock = &waddrmgr.BlockStamp{
Hash: *hash,
Height: mid,
Timestamp: header.Timestamp,
}
break
}
// The block's timestamp is more than 2 hours after the
// birthday, so look for a lower block.
if header.Timestamp.Sub(birthday) > birthdayBlockDelta {
right = mid
continue
}
// The birthday is more than 2 hours before the block's
// timestamp, so look for a higher block.
if header.Timestamp.Sub(birthday) < -birthdayBlockDelta {
left = mid
continue
}
birthdayBlock = &waddrmgr.BlockStamp{
Hash: *hash,
Height: mid,
Timestamp: header.Timestamp,
}
break
}
log.Debugf("Found birthday block: height=%d, hash=%v, timestamp=%v",
birthdayBlock.Height, birthdayBlock.Hash,
birthdayBlock.Timestamp)
return birthdayBlock, nil
}
// recoverySyncer is used to synchronize wallet and address manager locking
// with the end of recovery. (*Wallet).recovery will store a recoverySyncer
// when invoked, and will close the done chan upon exit. Setting the quit flag
// will cause recovery to end after the current batch of blocks.
type recoverySyncer struct {
done chan struct{}
quit uint32 // atomic
}
// recovery attempts to recover any unspent outputs that pay to any of our
// addresses starting from our birthday, or the wallet's tip (if higher), which
// would indicate resuming a recovery after a restart.
func (w *Wallet) recovery(chainClient chain.Interface,
birthdayBlock *waddrmgr.BlockStamp) error {
log.Infof("RECOVERY MODE ENABLED -- rescanning for used addresses "+
"with recovery_window=%d", w.recoveryWindow)
// Wallet locking must synchronize with the end of recovery, since use of
// keys in recovery is racy with manager IsLocked checks, which could
// result in enrypting data with a zeroed key.
syncer := &recoverySyncer{done: make(chan struct{})}
w.recovering.Store(syncer)
defer close(syncer.done)
// We'll initialize the recovery manager with a default batch size of
// 2000.
recoveryMgr := NewRecoveryManager(
w.recoveryWindow, recoveryBatchSize, w.chainParams,
)
// In the event that this recovery is being resumed, we will need to
// repopulate all found addresses from the database. Ideally, for basic
// recovery, we would only do so for the default scopes, but due to a
// bug in which the wallet would create change addresses outside of the
// default scopes, it's necessary to attempt all registered key scopes.
scopedMgrs := make(map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager)
for _, scopedMgr := range w.Manager.ActiveScopedKeyManagers() {
scopedMgrs[scopedMgr.Scope()] = scopedMgr
}
err := walletdb.View(w.db, func(tx walletdb.ReadTx) error {
txMgrNS := tx.ReadBucket(wtxmgrNamespaceKey)
credits, err := w.TxStore.UnspentOutputs(txMgrNS)
if err != nil {
return err
}
addrMgrNS := tx.ReadBucket(waddrmgrNamespaceKey)
return recoveryMgr.Resurrect(addrMgrNS, scopedMgrs, credits)
})
if err != nil {
return err
}
// Fetch the best height from the backend to determine when we should
// stop.
_, bestHeight, err := chainClient.GetBestBlock()
if err != nil {
return err
}
// Now we can begin scanning the chain from the wallet's current tip to
// ensure we properly handle restarts. Since the recovery process itself
// acts as rescan, we'll also update our wallet's synced state along the
// way to reflect the blocks we process and prevent rescanning them
// later on.
//
// NOTE: We purposefully don't update our best height since we assume
// that a wallet rescan will be performed from the wallet's tip, which
// will be of bestHeight after completing the recovery process.
var blocks []*waddrmgr.BlockStamp
startHeight := w.Manager.SyncedTo().Height + 1
for height := startHeight; height <= bestHeight; height++ {
if atomic.LoadUint32(&syncer.quit) == 1 {
return errors.New("recovery: forced shutdown")
}
hash, err := chainClient.GetBlockHash(int64(height))
if err != nil {
return err
}
header, err := chainClient.GetBlockHeader(hash)
if err != nil {
return err
}
blocks = append(blocks, &waddrmgr.BlockStamp{
Hash: *hash,
Height: height,
Timestamp: header.Timestamp,
})
// It's possible for us to run into blocks before our birthday
// if our birthday is after our reorg safe height, so we'll make
// sure to not add those to the batch.
if height >= birthdayBlock.Height {
recoveryMgr.AddToBlockBatch(
hash, height, header.Timestamp,
)
}
// We'll perform our recovery in batches of 2000 blocks. It's
// possible for us to reach our best height without exceeding
// the recovery batch size, so we can proceed to commit our
// state to disk.
recoveryBatch := recoveryMgr.BlockBatch()
if len(recoveryBatch) == recoveryBatchSize || height == bestHeight {
err := walletdb.Update(w.db, func(tx walletdb.ReadWriteTx) error {
ns := tx.ReadWriteBucket(waddrmgrNamespaceKey)
for _, block := range blocks {
err := w.Manager.SetSyncedTo(ns, block)
if err != nil {
return err
}
}
return w.recoverScopedAddresses(
chainClient, tx, ns, recoveryBatch,
recoveryMgr.State(), scopedMgrs,
)
})
if err != nil {
return err
}
if len(recoveryBatch) > 0 {
log.Infof("Recovered addresses from blocks "+
"%d-%d", recoveryBatch[0].Height,
recoveryBatch[len(recoveryBatch)-1].Height)
}
// Clear the batch of all processed blocks to reuse the
// same memory for future batches.
blocks = blocks[:0]
recoveryMgr.ResetBlockBatch()
}
}
return nil
}
// recoverScopedAddresses scans a range of blocks in attempts to recover any
// previously used addresses for a particular account derivation path. At a high
// level, the algorithm works as follows:
//
// 1. Ensure internal and external branch horizons are fully expanded.
// 2. Filter the entire range of blocks, stopping if a non-zero number of
// address are contained in a particular block.
// 3. Record all internal and external addresses found in the block.
// 4. Record any outpoints found in the block that should be watched for spends
// 5. Trim the range of blocks up to and including the one reporting the addrs.
// 6. Repeat from (1) if there are still more blocks in the range.
//
// TODO(conner): parallelize/pipeline/cache intermediate network requests
func (w *Wallet) recoverScopedAddresses(
chainClient chain.Interface,
tx walletdb.ReadWriteTx,
ns walletdb.ReadWriteBucket,
batch []wtxmgr.BlockMeta,
recoveryState *RecoveryState,
scopedMgrs map[waddrmgr.KeyScope]*waddrmgr.ScopedKeyManager) error {
// If there are no blocks in the batch, we are done.
if len(batch) == 0 {
return nil
}
log.Infof("Scanning %d blocks for recoverable addresses", len(batch))
expandHorizons:
for scope, scopedMgr := range scopedMgrs {
scopeState := recoveryState.StateForScope(scope)
err := expandScopeHorizons(ns, scopedMgr, scopeState)
if err != nil {
return err
}
}
// With the internal and external horizons properly expanded, we now
// construct the filter blocks request. The request includes the range
// of blocks we intend to scan, in addition to the scope-index -> addr
// map for all internal and external branches.
filterReq := newFilterBlocksRequest(batch, scopedMgrs, recoveryState)
// Initiate the filter blocks request using our chain backend. If an
// error occurs, we are unable to proceed with the recovery.
filterResp, err := chainClient.FilterBlocks(filterReq)
if err != nil {
return err
}
// If the filter response is empty, this signals that the rest of the
// batch was completed, and no other addresses were discovered. As a
// result, no further modifications to our recovery state are required
// and we can proceed to the next batch.
if filterResp == nil {
return nil
}
// Otherwise, retrieve the block info for the block that detected a
// non-zero number of address matches.
block := batch[filterResp.BatchIndex]
// Log any non-trivial findings of addresses or outpoints.
logFilterBlocksResp(block, filterResp)
// Report any external or internal addresses found as a result of the
// appropriate branch recovery state. Adding indexes above the
// last-found index of either will result in the horizons being expanded
// upon the next iteration. Any found addresses are also marked used
// using the scoped key manager.
err = extendFoundAddresses(ns, filterResp, scopedMgrs, recoveryState)
if err != nil {
return err
}
// Update the global set of watched outpoints with any that were found
// in the block.
for outPoint, addr := range filterResp.FoundOutPoints {
outPoint := outPoint
recoveryState.AddWatchedOutPoint(&outPoint, addr)
}
// Finally, record all of the relevant transactions that were returned
// in the filter blocks response. This ensures that these transactions
// and their outputs are tracked when the final rescan is performed.
for _, txn := range filterResp.RelevantTxns {
txRecord, err := wtxmgr.NewTxRecordFromMsgTx(
txn, filterResp.BlockMeta.Time,
)
if err != nil {
return err
}
err = w.addRelevantTx(tx, txRecord, &filterResp.BlockMeta)
if err != nil {
return err
}
}
// Update the batch to indicate that we've processed all block through
// the one that returned found addresses.
batch = batch[filterResp.BatchIndex+1:]
// If this was not the last block in the batch, we will repeat the
// filtering process again after expanding our horizons.
if len(batch) > 0 {
goto expandHorizons
}
return nil
}
// expandScopeHorizons ensures that the ScopeRecoveryState has an adequately
// sized look ahead for both its internal and external branches. The keys
// derived here are added to the scope's recovery state, but do not affect the
// persistent state of the wallet. If any invalid child keys are detected, the
// horizon will be properly extended such that our lookahead always includes the
// proper number of valid child keys.
func expandScopeHorizons(ns walletdb.ReadWriteBucket,
scopedMgr *waddrmgr.ScopedKeyManager,
scopeState *ScopeRecoveryState) error {
// Compute the current external horizon and the number of addresses we
// must derive to ensure we maintain a sufficient recovery window for
// the external branch.
exHorizon, exWindow := scopeState.ExternalBranch.ExtendHorizon()
count, childIndex := uint32(0), exHorizon
for count < exWindow {
keyPath := externalKeyPath(childIndex)
addr, err := scopedMgr.DeriveFromKeyPath(ns, keyPath)
switch {
case err == hdkeychain.ErrInvalidChild:
// Record the existence of an invalid child with the
// external branch's recovery state. This also
// increments the branch's horizon so that it accounts
// for this skipped child index.
scopeState.ExternalBranch.MarkInvalidChild(childIndex)
childIndex++
continue
case err != nil:
return err
}
// Register the newly generated external address and child index
// with the external branch recovery state.
scopeState.ExternalBranch.AddAddr(childIndex, addr.Address())
childIndex++
count++
}
// Compute the current internal horizon and the number of addresses we
// must derive to ensure we maintain a sufficient recovery window for
// the internal branch.
inHorizon, inWindow := scopeState.InternalBranch.ExtendHorizon()
count, childIndex = 0, inHorizon
for count < inWindow {
keyPath := internalKeyPath(childIndex)
addr, err := scopedMgr.DeriveFromKeyPath(ns, keyPath)
switch {
case err == hdkeychain.ErrInvalidChild:
// Record the existence of an invalid child with the
// internal branch's recovery state. This also
// increments the branch's horizon so that it accounts
// for this skipped child index.
scopeState.InternalBranch.MarkInvalidChild(childIndex)
childIndex++
continue
case err != nil:
return err
}
// Register the newly generated internal address and child index
// with the internal branch recovery state.
scopeState.InternalBranch.AddAddr(childIndex, addr.Address())
childIndex++