-
Notifications
You must be signed in to change notification settings - Fork 119
/
Copy pathbitcoin_client.go
1403 lines (1282 loc) · 46.8 KB
/
bitcoin_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package zetaclient
import (
"bytes"
"encoding/hex"
"fmt"
"math"
"math/big"
"os"
"sort"
"strconv"
"sync"
"sync/atomic"
cosmosmath "cosmossdk.io/math"
"github.com/btcsuite/btcd/btcjson"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/zeta-chain/zetacore/common"
"github.com/zeta-chain/zetacore/x/crosschain/types"
observertypes "github.com/zeta-chain/zetacore/x/observer/types"
"github.com/zeta-chain/zetacore/zetaclient/config"
metricsPkg "github.com/zeta-chain/zetacore/zetaclient/metrics"
clienttypes "github.com/zeta-chain/zetacore/zetaclient/types"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
var _ ChainClient = &BitcoinChainClient{}
type BTCLog struct {
ChainLogger zerolog.Logger
WatchInTx zerolog.Logger
ObserveOutTx zerolog.Logger
WatchUTXOS zerolog.Logger
WatchGasPrice zerolog.Logger
}
// BitcoinChainClient represents a chain configuration for Bitcoin
// Filled with above constants depending on chain
type BitcoinChainClient struct {
*ChainMetrics
chain common.Chain
rpcClient BTCRPCClient
zetaClient ZetaCoreBridger
Tss TSSSigner
lastBlock int64
lastBlockScanned int64
BlockTime uint64 // block time in seconds
Mu *sync.Mutex // lock for all the maps, utxos and core params
pendingNonce uint64
includedTxHashes map[string]bool // key: tx hash
includedTxResults map[string]*btcjson.GetTransactionResult // key: chain-tss-nonce
broadcastedTx map[string]string // key: chain-tss-nonce, value: outTx hash
utxos []btcjson.ListUnspentResult
params observertypes.ChainParams
db *gorm.DB
stop chan struct{}
logger BTCLog
ts *TelemetryServer
BlockCache *lru.Cache
}
const (
minConfirmations = 0
maxHeightDiff = 10000
btcBlocksPerDay = 144
)
func (ob *BitcoinChainClient) WithZetaClient(bridge *ZetaCoreBridge) {
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.zetaClient = bridge
}
func (ob *BitcoinChainClient) WithLogger(logger zerolog.Logger) {
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.logger = BTCLog{
ChainLogger: logger,
WatchInTx: logger.With().Str("module", "WatchInTx").Logger(),
ObserveOutTx: logger.With().Str("module", "observeOutTx").Logger(),
WatchUTXOS: logger.With().Str("module", "WatchUTXOS").Logger(),
WatchGasPrice: logger.With().Str("module", "WatchGasPrice").Logger(),
}
}
func (ob *BitcoinChainClient) WithBtcClient(client *rpcclient.Client) {
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.rpcClient = client
}
func (ob *BitcoinChainClient) WithChain(chain common.Chain) {
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.chain = chain
}
func (ob *BitcoinChainClient) SetChainParams(params observertypes.ChainParams) {
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.params = params
}
func (ob *BitcoinChainClient) GetChainParams() observertypes.ChainParams {
ob.Mu.Lock()
defer ob.Mu.Unlock()
return ob.params
}
// NewBitcoinClient returns a new configuration based on supplied target chain
func NewBitcoinClient(
chain common.Chain,
bridge ZetaCoreBridger,
tss TSSSigner,
dbpath string,
metrics *metricsPkg.Metrics,
logger zerolog.Logger,
btcCfg config.BTCConfig,
ts *TelemetryServer,
) (*BitcoinChainClient, error) {
ob := BitcoinChainClient{
ChainMetrics: NewChainMetrics(chain.ChainName.String(), metrics),
ts: ts,
}
ob.stop = make(chan struct{})
ob.chain = chain
ob.Mu = &sync.Mutex{}
chainLogger := logger.With().Str("chain", chain.ChainName.String()).Logger()
ob.logger = BTCLog{
ChainLogger: chainLogger,
WatchInTx: chainLogger.With().Str("module", "WatchInTx").Logger(),
ObserveOutTx: chainLogger.With().Str("module", "observeOutTx").Logger(),
WatchUTXOS: chainLogger.With().Str("module", "WatchUTXOS").Logger(),
WatchGasPrice: chainLogger.With().Str("module", "WatchGasPrice").Logger(),
}
ob.zetaClient = bridge
ob.Tss = tss
ob.includedTxHashes = make(map[string]bool)
ob.includedTxResults = make(map[string]*btcjson.GetTransactionResult)
ob.broadcastedTx = make(map[string]string)
ob.params = btcCfg.ChainParams
// initialize the Client
ob.logger.ChainLogger.Info().Msgf("Chain %s endpoint %s", ob.chain.String(), btcCfg.RPCHost)
connCfg := &rpcclient.ConnConfig{
Host: btcCfg.RPCHost,
User: btcCfg.RPCUsername,
Pass: btcCfg.RPCPassword,
HTTPPostMode: true,
DisableTLS: true,
Params: btcCfg.RPCParams,
}
client, err := rpcclient.New(connCfg, nil)
if err != nil {
return nil, fmt.Errorf("error creating rpc client: %s", err)
}
ob.rpcClient = client
err = client.Ping()
if err != nil {
return nil, fmt.Errorf("error ping the bitcoin server: %s", err)
}
ob.BlockCache, err = lru.New(btcBlocksPerDay)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("failed to create bitcoin block cache")
return nil, err
}
err = ob.RegisterPromGauge(metricsPkg.PendingTxs, "Number of pending transactions")
if err != nil {
return nil, err
}
//Load btc chain client DB
err = ob.loadDB(dbpath)
if err != nil {
return nil, err
}
return &ob, nil
}
func (ob *BitcoinChainClient) Start() {
ob.logger.ChainLogger.Info().Msgf("BitcoinChainClient is starting")
go ob.WatchInTx()
go ob.observeOutTx()
go ob.WatchUTXOS()
go ob.WatchGasPrice()
go ob.ExternalChainWatcherForNewInboundTrackerSuggestions()
}
func (ob *BitcoinChainClient) Stop() {
ob.logger.ChainLogger.Info().Msgf("ob %s is stopping", ob.chain.String())
close(ob.stop) // this notifies all goroutines to stop
ob.logger.ChainLogger.Info().Msgf("%s observer stopped", ob.chain.String())
}
func (ob *BitcoinChainClient) SetLastBlockHeight(height int64) {
if height < 0 {
panic("lastBlock is negative")
}
atomic.StoreInt64(&ob.lastBlock, height)
}
func (ob *BitcoinChainClient) GetLastBlockHeight() int64 {
height := atomic.LoadInt64(&ob.lastBlock)
if height < 0 {
panic("lastBlock is negative")
}
return height
}
func (ob *BitcoinChainClient) SetLastBlockHeightScanned(height int64) {
if height < 0 {
panic("lastBlockScanned is negative")
}
atomic.StoreInt64(&ob.lastBlockScanned, height)
// #nosec G701 checked as positive
ob.ts.SetLastScannedBlockNumber((ob.chain.ChainId), uint64(height))
}
func (ob *BitcoinChainClient) GetLastBlockHeightScanned() int64 {
height := atomic.LoadInt64(&ob.lastBlockScanned)
if height < 0 {
panic("lastBlockScanned is negative")
}
return height
}
func (ob *BitcoinChainClient) GetPendingNonce() uint64 {
ob.Mu.Lock()
defer ob.Mu.Unlock()
return ob.pendingNonce
}
// GetBaseGasPrice ...
// TODO: implement
// https://github.com/zeta-chain/node/issues/868
func (ob *BitcoinChainClient) GetBaseGasPrice() *big.Int {
return big.NewInt(0)
}
func (ob *BitcoinChainClient) WatchInTx() {
ticker, err := NewDynamicTicker("Bitcoin_WatchInTx", ob.GetChainParams().InTxTicker)
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error")
return
}
defer ticker.Stop()
for {
select {
case <-ticker.C():
err := ob.observeInTx()
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msg("WatchInTx error observing in tx")
}
ticker.UpdateInterval(ob.GetChainParams().InTxTicker, ob.logger.WatchInTx)
case <-ob.stop:
ob.logger.WatchInTx.Info().Msg("WatchInTx stopped")
return
}
}
}
func (ob *BitcoinChainClient) postBlockHeader(tip int64) error {
ob.logger.WatchInTx.Info().Msgf("postBlockHeader: tip %d", tip)
bn := tip
res, err := ob.zetaClient.GetBlockHeaderStateByChain(ob.chain.ChainId)
if err == nil && res.BlockHeaderState != nil && res.BlockHeaderState.EarliestHeight > 0 {
bn = res.BlockHeaderState.LatestHeight + 1
}
if bn > tip {
return fmt.Errorf("postBlockHeader: must post block confirmed block header: %d > %d", bn, tip)
}
res2, err := ob.GetBlockByNumberCached(bn)
if err != nil {
return fmt.Errorf("error getting bitcoin block %d: %s", bn, err)
}
var headerBuf bytes.Buffer
err = res2.Header.Serialize(&headerBuf)
if err != nil { // should never happen
ob.logger.WatchInTx.Error().Err(err).Msgf("error serializing bitcoin block header: %d", bn)
return err
}
blockHash := res2.Header.BlockHash()
_, err = ob.zetaClient.PostAddBlockHeader(
ob.chain.ChainId,
blockHash[:],
res2.Block.Height,
common.NewBitcoinHeader(headerBuf.Bytes()),
)
ob.logger.WatchInTx.Info().Msgf("posted block header %d: %s", bn, blockHash)
if err != nil { // error shouldn't block the process
ob.logger.WatchInTx.Error().Err(err).Msgf("error posting bitcoin block header: %d", bn)
}
return err
}
func (ob *BitcoinChainClient) observeInTx() error {
// make sure inbound TXS / Send is enabled by the protocol
flags, err := ob.zetaClient.GetCrosschainFlags()
if err != nil {
return err
}
if !flags.IsInboundEnabled {
return errors.New("inbound TXS / Send has been disabled by the protocol")
}
// get and update latest block height
cnt, err := ob.rpcClient.GetBlockCount()
if err != nil {
return fmt.Errorf("observeInTxBTC: error getting block count: %s", err)
}
if cnt < 0 {
return fmt.Errorf("observeInTxBTC: block count is negative: %d", cnt)
}
ob.SetLastBlockHeight(cnt)
// skip if current height is too low
// #nosec G701 always in range
confirmedBlockNum := cnt - int64(ob.GetChainParams().ConfirmationCount)
if confirmedBlockNum < 0 {
return fmt.Errorf("observeInTxBTC: skipping observer, current block number %d is too low", cnt)
}
// skip if no new block is confirmed
lastScanned := ob.GetLastBlockHeightScanned()
if lastScanned >= confirmedBlockNum {
return nil
}
// query incoming gas asset to TSS address
{
bn := lastScanned + 1
res, err := ob.GetBlockByNumberCached(bn)
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error getting bitcoin block %d", bn)
return err
}
ob.logger.WatchInTx.Info().Msgf("observeInTxBTC: block %d has %d txs, current block %d, last block %d",
bn, len(res.Block.Tx), cnt, lastScanned)
// print some debug information
if len(res.Block.Tx) > 1 {
for idx, tx := range res.Block.Tx {
ob.logger.WatchInTx.Debug().Msgf("BTC InTX | %d: %s\n", idx, tx.Txid)
for vidx, vout := range tx.Vout {
ob.logger.WatchInTx.Debug().Msgf("vout %d \n value: %v\n scriptPubKey: %v\n", vidx, vout.Value, vout.ScriptPubKey.Hex)
}
}
}
// add block header to zetacore
if flags.BlockHeaderVerificationFlags != nil && flags.BlockHeaderVerificationFlags.IsBtcTypeChainEnabled {
err = ob.postBlockHeader(bn)
if err != nil {
ob.logger.WatchInTx.Warn().Err(err).Msgf("observeInTxBTC: error posting block header %d", bn)
}
}
tssAddress := ob.Tss.BTCAddress()
// #nosec G701 always positive
inTxs := FilterAndParseIncomingTx(
res.Block.Tx,
uint64(res.Block.Height),
tssAddress,
&ob.logger.WatchInTx,
ob.chain.ChainId,
)
// post inbound vote message to zetacore
for _, inTx := range inTxs {
msg := ob.GetInboundVoteMessageFromBtcEvent(inTx)
zetaHash, ballot, err := ob.zetaClient.PostSend(PostSendEVMGasLimit, msg)
if err != nil {
ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error posting to zeta core for tx %s", inTx.TxHash)
return err // we have to re-scan this block next time
} else if zetaHash != "" {
ob.logger.WatchInTx.Info().Msgf("observeInTxBTC: BTC deposit detected and reported: PostSend zeta tx: %s ballot %s", zetaHash, ballot)
}
}
// Save LastBlockHeight
ob.SetLastBlockHeightScanned(bn)
// #nosec G701 always positive
if err := ob.db.Save(clienttypes.ToLastBlockSQLType(uint64(bn))).Error; err != nil {
ob.logger.WatchInTx.Error().Err(err).Msgf("observeInTxBTC: error writing last scanned block %d to db", bn)
}
}
return nil
}
// ConfirmationsThreshold returns number of required Bitcoin confirmations depending on sent BTC amount.
func (ob *BitcoinChainClient) ConfirmationsThreshold(amount *big.Int) int64 {
if amount.Cmp(big.NewInt(200000000)) >= 0 {
return 6
}
return 2
}
// IsSendOutTxProcessed returns isIncluded(or inMempool), isConfirmed, Error
func (ob *BitcoinChainClient) IsSendOutTxProcessed(sendHash string, nonce uint64, _ common.CoinType, logger zerolog.Logger) (bool, bool, error) {
outTxID := ob.GetTxID(nonce)
logger.Info().Msgf("IsSendOutTxProcessed %s", outTxID)
ob.Mu.Lock()
txnHash, broadcasted := ob.broadcastedTx[outTxID]
res, included := ob.includedTxResults[outTxID]
ob.Mu.Unlock()
// Get original cctx parameters
params, err := ob.GetCctxParams(nonce)
if err != nil {
ob.logger.ObserveOutTx.Info().Msgf("IsSendOutTxProcessed: can't find pending cctx for nonce %d", nonce)
return false, false, err
}
if !included {
if !broadcasted {
return false, false, nil
}
// If the broadcasted outTx is nonce 0, just wait for inclusion and don't schedule more keysign
// Schedule more than one keysign for nonce 0 can lead to duplicate payments.
// One purpose of nonce mark UTXO is to avoid duplicate payment based on the fact that Bitcoin
// prevents double spending of same UTXO. However, for nonce 0, we don't have a prior nonce (e.g., -1)
// for the signer to check against when making the payment. Signer treats nonce 0 as a special case in downstream code.
if nonce == 0 {
return true, false, nil
}
// Try including this outTx broadcasted by myself
txResult, inMempool := ob.checkIncludedTx(txnHash, params)
if txResult == nil {
ob.logger.ObserveOutTx.Error().Err(err).Msg("IsSendOutTxProcessed: checkIncludedTx failed")
return false, false, err
} else if inMempool { // still in mempool (should avoid unnecessary Tss keysign)
ob.logger.ObserveOutTx.Info().Msgf("IsSendOutTxProcessed: outTx %s is still in mempool", outTxID)
return true, false, nil
}
// included
ob.setIncludedTx(nonce, txResult)
// Get tx result again in case it is just included
res = ob.getIncludedTx(nonce)
if res == nil {
return false, false, nil
}
ob.logger.ObserveOutTx.Info().Msgf("IsSendOutTxProcessed: setIncludedTx succeeded for outTx %s", outTxID)
}
// It's safe to use cctx's amount to post confirmation because it has already been verified in observeOutTx()
amountInSat := params.Amount.BigInt()
if res.Confirmations < ob.ConfirmationsThreshold(amountInSat) {
return true, false, nil
}
logger.Debug().Msgf("Bitcoin outTx confirmed: txid %s, amount %s\n", res.TxID, amountInSat.String())
zetaHash, ballot, err := ob.zetaClient.PostReceiveConfirmation(
sendHash,
res.TxID,
// #nosec G701 always positive
uint64(res.BlockIndex),
0, // gas used not used with Bitcoin
nil, // gas price not used with Bitcoin
0, // gas limit not used with Bitcoin
amountInSat,
common.ReceiveStatus_Success,
ob.chain,
nonce,
common.CoinType_Gas,
)
if err != nil {
logger.Error().Err(err).Msgf("IsSendOutTxProcessed: error confirming bitcoin outTx %s, nonce %d ballot %s", res.TxID, nonce, ballot)
} else if zetaHash != "" {
logger.Info().Msgf("IsSendOutTxProcessed: confirmed Bitcoin outTx %s, zeta tx hash %s nonce %d ballot %s", res.TxID, zetaHash, nonce, ballot)
}
return true, true, nil
}
func (ob *BitcoinChainClient) WatchGasPrice() {
ticker, err := NewDynamicTicker("Bitcoin_WatchGasPrice", ob.GetChainParams().GasPriceTicker)
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("WatchGasPrice error")
return
}
defer ticker.Stop()
for {
select {
case <-ticker.C():
err := ob.PostGasPrice()
if err != nil {
ob.logger.WatchGasPrice.Error().Err(err).Msg("PostGasPrice error on " + ob.chain.String())
}
ticker.UpdateInterval(ob.GetChainParams().GasPriceTicker, ob.logger.WatchGasPrice)
case <-ob.stop:
ob.logger.WatchGasPrice.Info().Msg("WatchGasPrice stopped")
return
}
}
}
func (ob *BitcoinChainClient) PostGasPrice() error {
if ob.chain.ChainId == 18444 { //bitcoin regtest; hardcode here since this RPC is not available on regtest
bn, err := ob.rpcClient.GetBlockCount()
if err != nil {
return err
}
// #nosec G701 always in range
zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, 1, "100", uint64(bn))
if err != nil {
ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice:")
return err
}
_ = zetaHash
//ob.logger.WatchGasPrice.Debug().Msgf("PostGasPrice zeta tx: %s", zetaHash)
return nil
}
// EstimateSmartFee returns the fees per kilobyte (BTC/kb) targeting given block confirmation
feeResult, err := ob.rpcClient.EstimateSmartFee(1, &btcjson.EstimateModeConservative)
if err != nil {
return err
}
if feeResult.Errors != nil || feeResult.FeeRate == nil {
return fmt.Errorf("error getting gas price: %s", feeResult.Errors)
}
if *feeResult.FeeRate > math.MaxInt64 {
return fmt.Errorf("gas price is too large: %f", *feeResult.FeeRate)
}
feeRatePerByte := FeeRateToSatPerByte(*feeResult.FeeRate)
bn, err := ob.rpcClient.GetBlockCount()
if err != nil {
return err
}
// #nosec G701 always positive
zetaHash, err := ob.zetaClient.PostGasPrice(ob.chain, feeRatePerByte.Uint64(), "100", uint64(bn))
if err != nil {
ob.logger.WatchGasPrice.Err(err).Msg("PostGasPrice:")
return err
}
_ = zetaHash
return nil
}
type BTCInTxEvnet struct {
FromAddress string // the first input address
ToAddress string // some TSS address
Value float64 // in BTC, not satoshi
MemoBytes []byte
BlockNumber uint64
TxHash string
}
// FilterAndParseIncomingTx given txs list returned by the "getblock 2" RPC command, return the txs that are relevant to us
// relevant tx must have the following vouts as the first two vouts:
// vout0: p2wpkh to the TSS address (targetAddress)
// vout1: OP_RETURN memo, base64 encoded
func FilterAndParseIncomingTx(
txs []btcjson.TxRawResult,
blockNumber uint64,
targetAddress string,
logger *zerolog.Logger,
chainID int64,
) []*BTCInTxEvnet {
inTxs := make([]*BTCInTxEvnet, 0)
for idx, tx := range txs {
if idx == 0 {
continue // the first tx is coinbase; we do not process coinbase tx
}
inTx, err := GetBtcEvent(tx, targetAddress, blockNumber, logger, chainID)
if err != nil {
logger.Error().Err(err).Msg("error getting btc event")
continue
}
if inTx != nil {
inTxs = append(inTxs, inTx)
}
}
return inTxs
}
func (ob *BitcoinChainClient) GetInboundVoteMessageFromBtcEvent(inTx *BTCInTxEvnet) *types.MsgVoteOnObservedInboundTx {
ob.logger.WatchInTx.Debug().Msgf("Processing inTx: %s", inTx.TxHash)
amount := big.NewFloat(inTx.Value)
amount = amount.Mul(amount, big.NewFloat(1e8))
amountInt, _ := amount.Int(nil)
message := hex.EncodeToString(inTx.MemoBytes)
return GetInBoundVoteMessage(
inTx.FromAddress,
ob.chain.ChainId,
inTx.FromAddress,
inTx.FromAddress,
ob.zetaClient.ZetaChain().ChainId,
cosmosmath.NewUintFromBigInt(amountInt),
message,
inTx.TxHash,
inTx.BlockNumber,
0,
common.CoinType_Gas,
"",
ob.zetaClient.GetKeys().GetOperatorAddress().String(),
0,
)
}
func GetBtcEvent(
tx btcjson.TxRawResult,
targetAddress string,
blockNumber uint64,
logger *zerolog.Logger,
chainID int64,
) (*BTCInTxEvnet, error) {
found := false
var value float64
var memo []byte
if len(tx.Vout) >= 2 {
// first vout must to addressed to the targetAddress with p2wpkh scriptPubKey
out := tx.Vout[0]
script := out.ScriptPubKey.Hex
if len(script) == 44 && script[:4] == "0014" { // segwit output: 0x00 + 20 bytes of pubkey hash
hash, err := hex.DecodeString(script[4:])
if err != nil {
return nil, err
}
bitcoinNetParams, err := common.BitcoinNetParamsFromChainID(chainID)
if err != nil {
return nil, fmt.Errorf("btc: error getting bitcoin net params : %v", err)
}
wpkhAddress, err := btcutil.NewAddressWitnessPubKeyHash(hash, bitcoinNetParams)
if err != nil {
return nil, err
}
if wpkhAddress.EncodeAddress() != targetAddress {
return nil, err
}
// deposit amount has to be no less than the minimum depositor fee
if out.Value < BtcDepositorFeeMin {
return nil, fmt.Errorf("btc deposit amount %v in txid %s is less than minimum depositor fee %v", value, tx.Txid, BtcDepositorFeeMin)
}
value = out.Value - BtcDepositorFeeMin
out = tx.Vout[1]
script = out.ScriptPubKey.Hex
if len(script) >= 4 && script[:2] == "6a" { // OP_RETURN
memoSize, err := strconv.ParseInt(script[2:4], 16, 32)
if err != nil {
return nil, errors.Wrapf(err, "error decoding pubkey hash")
}
if int(memoSize) != (len(script)-4)/2 {
return nil, fmt.Errorf("memo size mismatch: %d != %d", memoSize, (len(script)-4)/2)
}
memoBytes, err := hex.DecodeString(script[4:])
if err != nil {
logger.Warn().Err(err).Msgf("error hex decoding memo")
return nil, fmt.Errorf("error hex decoding memo: %s", err)
}
if bytes.Equal(memoBytes, []byte(DonationMessage)) {
logger.Info().Msgf("donation tx: %s; value %f", tx.Txid, value)
return nil, fmt.Errorf("donation tx: %s; value %f", tx.Txid, value)
}
memo = memoBytes
found = true
}
}
}
if found {
logger.Info().Msgf("found bitcoin intx: %s", tx.Txid)
var fromAddress string
if len(tx.Vin) > 0 {
vin := tx.Vin[0]
//log.Info().Msgf("vin: %v", vin.Witness)
if len(vin.Witness) == 2 {
pk := vin.Witness[1]
pkBytes, err := hex.DecodeString(pk)
if err != nil {
return nil, errors.Wrapf(err, "error decoding pubkey")
}
hash := btcutil.Hash160(pkBytes)
bitcoinNetParams, err := common.BitcoinNetParamsFromChainID(chainID)
if err != nil {
return nil, fmt.Errorf("btc: error getting bitcoin net params : %v", err)
}
addr, err := btcutil.NewAddressWitnessPubKeyHash(hash, bitcoinNetParams)
if err != nil {
return nil, errors.Wrapf(err, "error decoding pubkey hash")
}
fromAddress = addr.EncodeAddress()
}
}
return &BTCInTxEvnet{
FromAddress: fromAddress,
ToAddress: targetAddress,
Value: value,
MemoBytes: memo,
BlockNumber: blockNumber,
TxHash: tx.Txid,
}, nil
}
return nil, nil
}
func (ob *BitcoinChainClient) WatchUTXOS() {
ticker, err := NewDynamicTicker("Bitcoin_WatchUTXOS", ob.GetChainParams().WatchUtxoTicker)
if err != nil {
ob.logger.WatchUTXOS.Error().Err(err).Msg("WatchUTXOS error")
return
}
defer ticker.Stop()
for {
select {
case <-ticker.C():
err := ob.FetchUTXOS()
if err != nil {
ob.logger.WatchUTXOS.Error().Err(err).Msg("error fetching btc utxos")
}
ticker.UpdateInterval(ob.GetChainParams().WatchUtxoTicker, ob.logger.WatchUTXOS)
case <-ob.stop:
ob.logger.WatchUTXOS.Info().Msg("WatchUTXOS stopped")
return
}
}
}
func (ob *BitcoinChainClient) FetchUTXOS() error {
defer func() {
if err := recover(); err != nil {
ob.logger.WatchUTXOS.Error().Msgf("BTC fetchUTXOS: caught panic error: %v", err)
}
}()
// This is useful when a zetaclient's pending nonce lagged behind for whatever reason.
ob.refreshPendingNonce()
// get the current block height.
bh, err := ob.rpcClient.GetBlockCount()
if err != nil {
return fmt.Errorf("btc: error getting block height : %v", err)
}
maxConfirmations := int(bh)
// List all unspent UTXOs (160ms)
tssAddr := ob.Tss.BTCAddress()
address, err := common.DecodeBtcAddress(tssAddr, ob.chain.ChainId)
if err != nil {
return fmt.Errorf("btc: error decoding wallet address (%s) : %s", tssAddr, err.Error())
}
utxos, err := ob.rpcClient.ListUnspentMinMaxAddresses(0, maxConfirmations, []btcutil.Address{address})
if err != nil {
return err
}
// rigid sort to make utxo list deterministic
sort.SliceStable(utxos, func(i, j int) bool {
if utxos[i].Amount == utxos[j].Amount {
if utxos[i].TxID == utxos[j].TxID {
return utxos[i].Vout < utxos[j].Vout
}
return utxos[i].TxID < utxos[j].TxID
}
return utxos[i].Amount < utxos[j].Amount
})
// filter UTXOs good to spend for next TSS transaction
utxosFiltered := make([]btcjson.ListUnspentResult, 0)
for _, utxo := range utxos {
// UTXOs big enough to cover the cost of spending themselves
if utxo.Amount < BtcDepositorFeeMin {
continue
}
// we don't want to spend other people's unconfirmed UTXOs as they may not be safe to spend
if utxo.Confirmations == 0 {
if !ob.isTssTransaction(utxo.TxID) {
continue
}
}
utxosFiltered = append(utxosFiltered, utxo)
}
ob.Mu.Lock()
ob.ts.SetNumberOfUTXOs(len(utxosFiltered))
ob.utxos = utxosFiltered
ob.Mu.Unlock()
return nil
}
// isTssTransaction checks if a given transaction was sent by TSS itself.
// An unconfirmed transaction is safe to spend only if it was sent by TSS and verified by ourselves.
func (ob *BitcoinChainClient) isTssTransaction(txid string) bool {
_, found := ob.includedTxHashes[txid]
return found
}
// refreshPendingNonce tries increasing the artificial pending nonce of outTx (if lagged behind).
// There could be many (unpredictable) reasons for a pending nonce lagging behind, for example:
// 1. The zetaclient gets restarted.
// 2. The tracker is missing in zetacore.
func (ob *BitcoinChainClient) refreshPendingNonce() {
// get pending nonces from zetacore
p, err := ob.zetaClient.GetPendingNoncesByChain(ob.chain.ChainId)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("refreshPendingNonce: error getting pending nonces")
}
// increase pending nonce if lagged behind
ob.Mu.Lock()
pendingNonce := ob.pendingNonce
ob.Mu.Unlock()
// #nosec G701 always non-negative
nonceLow := uint64(p.NonceLow)
if nonceLow > pendingNonce {
// get the last included outTx hash
txid, err := ob.getOutTxidByNonce(nonceLow-1, false)
if err != nil {
ob.logger.ChainLogger.Error().Err(err).Msg("refreshPendingNonce: error getting last outTx txid")
}
// set 'NonceLow' as the new pending nonce
ob.Mu.Lock()
defer ob.Mu.Unlock()
ob.pendingNonce = nonceLow
ob.logger.ChainLogger.Info().Msgf("refreshPendingNonce: increase pending nonce to %d with txid %s", ob.pendingNonce, txid)
}
}
func (ob *BitcoinChainClient) getOutTxidByNonce(nonce uint64, test bool) (string, error) {
// There are 2 types of txids an observer can trust
// 1. The ones had been verified and saved by observer self.
// 2. The ones had been finalized in zetacore based on majority vote.
if res := ob.getIncludedTx(nonce); res != nil {
return res.TxID, nil
}
if !test { // if not unit test, get cctx from zetacore
send, err := ob.zetaClient.GetCctxByNonce(ob.chain.ChainId, nonce)
if err != nil {
return "", errors.Wrapf(err, "getOutTxidByNonce: error getting cctx for nonce %d", nonce)
}
txid := send.GetCurrentOutTxParam().OutboundTxHash
if txid == "" {
return "", fmt.Errorf("getOutTxidByNonce: cannot find outTx txid for nonce %d", nonce)
}
// make sure it's a real Bitcoin txid
_, getTxResult, err := ob.GetTxResultByHash(txid)
if err != nil {
return "", errors.Wrapf(err, "getOutTxidByNonce: error getting outTx result for nonce %d hash %s", nonce, txid)
}
if getTxResult.Confirmations <= 0 { // just a double check
return "", fmt.Errorf("getOutTxidByNonce: outTx txid %s for nonce %d is not included", txid, nonce)
}
return txid, nil
}
return "", fmt.Errorf("getOutTxidByNonce: cannot find outTx txid for nonce %d", nonce)
}
func (ob *BitcoinChainClient) findNonceMarkUTXO(nonce uint64, txid string) (int, error) {
tssAddress := ob.Tss.BTCAddressWitnessPubkeyHash().EncodeAddress()
amount := common.NonceMarkAmount(nonce)
for i, utxo := range ob.utxos {
sats, err := GetSatoshis(utxo.Amount)
if err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msgf("findNonceMarkUTXO: error getting satoshis for utxo %v", utxo)
}
if utxo.Address == tssAddress && sats == amount && utxo.TxID == txid {
ob.logger.ObserveOutTx.Info().Msgf("findNonceMarkUTXO: found nonce-mark utxo with txid %s, amount %d satoshi", utxo.TxID, sats)
return i, nil
}
}
return -1, fmt.Errorf("findNonceMarkUTXO: cannot find nonce-mark utxo with nonce %d", nonce)
}
// SelectUTXOs selects a sublist of utxos to be used as inputs.
//
// Parameters:
// - amount: The desired minimum total value of the selected UTXOs.
// - utxos2Spend: The maximum number of UTXOs to spend.
// - nonce: The nonce of the outbound transaction.
// - consolidateRank: The rank below which UTXOs will be consolidated.
// - test: true for unit test only.
//
// Returns:
// - a sublist (includes previous nonce-mark) of UTXOs or an error if the qualifying sublist cannot be found.
// - the total value of the selected UTXOs.
// - the number of consolidated UTXOs.
// - the total value of the consolidated UTXOs.
func (ob *BitcoinChainClient) SelectUTXOs(amount float64, utxosToSpend uint16, nonce uint64, consolidateRank uint16, test bool) ([]btcjson.ListUnspentResult, float64, uint16, float64, error) {
idx := -1
if nonce == 0 {
// for nonce = 0; make exception; no need to include nonce-mark utxo
ob.Mu.Lock()
defer ob.Mu.Unlock()
} else {
// for nonce > 0; we proceed only when we see the nonce-mark utxo
preTxid, err := ob.getOutTxidByNonce(nonce-1, test)
if err != nil {
return nil, 0, 0, 0, err
}
ob.Mu.Lock()
defer ob.Mu.Unlock()
idx, err = ob.findNonceMarkUTXO(nonce-1, preTxid)
if err != nil {
return nil, 0, 0, 0, err
}
}
// select smallest possible UTXOs to make payment
total := 0.0
left, right := 0, 0
for total < amount && right < len(ob.utxos) {
if utxosToSpend > 0 { // expand sublist
total += ob.utxos[right].Amount
right++
utxosToSpend--
} else { // pop the smallest utxo and append the current one
total -= ob.utxos[left].Amount
total += ob.utxos[right].Amount
left++
right++
}
}
results := make([]btcjson.ListUnspentResult, right-left)
copy(results, ob.utxos[left:right])
// include nonce-mark as the 1st input
if idx >= 0 { // for nonce > 0
if idx < left || idx >= right {
total += ob.utxos[idx].Amount
results = append([]btcjson.ListUnspentResult{ob.utxos[idx]}, results...)
} else { // move nonce-mark to left
for i := idx - left; i > 0; i-- {
results[i], results[i-1] = results[i-1], results[i]
}
}
}
if total < amount {
return nil, 0, 0, 0, fmt.Errorf("SelectUTXOs: not enough btc in reserve - available : %v , tx amount : %v", total, amount)
}
// consolidate biggest possible UTXOs to maximize consolidated value
// consolidation happens only when there are more than (or equal to) consolidateRank (10) UTXOs
utxoRank, consolidatedUtxo, consolidatedValue := uint16(0), uint16(0), 0.0
for i := len(ob.utxos) - 1; i >= 0 && utxosToSpend > 0; i-- { // iterate over UTXOs big-to-small
if i != idx && (i < left || i >= right) { // exclude nonce-mark and already selected UTXOs
utxoRank++
if utxoRank >= consolidateRank { // consolication starts from the 10-ranked UTXO based on value
utxosToSpend--
consolidatedUtxo++
total += ob.utxos[i].Amount
consolidatedValue += ob.utxos[i].Amount
results = append(results, ob.utxos[i])
}
}
}
return results, total, consolidatedUtxo, consolidatedValue, nil
}
// SaveBroadcastedTx saves successfully broadcasted transaction
func (ob *BitcoinChainClient) SaveBroadcastedTx(txHash string, nonce uint64) {
outTxID := ob.GetTxID(nonce)
ob.Mu.Lock()
ob.broadcastedTx[outTxID] = txHash
ob.Mu.Unlock()
broadcastEntry := clienttypes.ToOutTxHashSQLType(txHash, outTxID)
if err := ob.db.Save(&broadcastEntry).Error; err != nil {
ob.logger.ObserveOutTx.Error().Err(err).Msgf("SaveBroadcastedTx: error saving broadcasted txHash %s for outTx %s", txHash, outTxID)
}
ob.logger.ObserveOutTx.Info().Msgf("SaveBroadcastedTx: saved broadcasted txHash %s for outTx %s", txHash, outTxID)
}
func (ob *BitcoinChainClient) GetCctxParams(nonce uint64) (types.OutboundTxParams, error) {
send, err := ob.zetaClient.GetCctxByNonce(ob.chain.ChainId, nonce)
if err != nil {
return types.OutboundTxParams{}, err
}
if send.GetCurrentOutTxParam() == nil { // never happen
return types.OutboundTxParams{}, fmt.Errorf("GetPendingCctx: nil outbound tx params")
}
return *send.GetCurrentOutTxParam(), nil
}