1
1
package zetaclient
2
2
3
3
import (
4
+ "bytes"
4
5
"context"
5
6
"fmt"
6
7
"math"
7
8
"math/big"
8
9
"os"
9
10
"sort"
10
11
"strconv"
12
+ "strings"
11
13
"sync"
12
14
"sync/atomic"
13
15
"time"
@@ -22,6 +24,7 @@ import (
22
24
"github.com/ethereum/go-ethereum/ethclient"
23
25
"github.com/ethereum/go-ethereum/rlp"
24
26
lru "github.com/hashicorp/golang-lru"
27
+ "github.com/onrik/ethrpc"
25
28
"github.com/pkg/errors"
26
29
"github.com/rs/zerolog"
27
30
"github.com/rs/zerolog/log"
@@ -70,6 +73,7 @@ type EVMChainClient struct {
70
73
* ChainMetrics
71
74
chain common.Chain
72
75
evmClient EVMRPCClient
76
+ evmClientAlternate * ethrpc.EthRPC // a fallback rpc client
73
77
KlaytnClient KlaytnRPCClient
74
78
zetaClient ZetaCoreBridger
75
79
Tss TSSSigner
@@ -92,7 +96,9 @@ type EVMChainClient struct {
92
96
params observertypes.ChainParams
93
97
ts * TelemetryServer
94
98
95
- BlockCache * lru.Cache
99
+ blockCache * lru.Cache
100
+ blockCacheV3 * lru.Cache // blockCacheV3 caches blocks containing type-3 (BlobTxType) transactions
101
+ headerCache * lru.Cache
96
102
}
97
103
98
104
var _ ChainClient = (* EVMChainClient )(nil )
@@ -146,12 +152,24 @@ func NewEVMChainClient(
146
152
return nil , err
147
153
}
148
154
ob .evmClient = client
155
+ ob .evmClientAlternate = ethrpc .NewEthRPC (evmCfg .Endpoint )
149
156
150
- ob .BlockCache , err = lru .New (1000 )
157
+ // create block header and block caches
158
+ ob .blockCache , err = lru .New (1000 )
151
159
if err != nil {
152
160
ob .logger .ChainLogger .Error ().Err (err ).Msg ("failed to create block cache" )
153
161
return nil , err
154
162
}
163
+ ob .blockCacheV3 , err = lru .New (1000 )
164
+ if err != nil {
165
+ ob .logger .ChainLogger .Error ().Err (err ).Msg ("failed to create block cache v3" )
166
+ return nil , err
167
+ }
168
+ ob .headerCache , err = lru .New (1000 )
169
+ if err != nil {
170
+ ob .logger .ChainLogger .Error ().Err (err ).Msg ("failed to create header cache" )
171
+ return nil , err
172
+ }
155
173
156
174
if ob .chain .IsKlaytnChain () {
157
175
client , err := Dial (evmCfg .Endpoint )
@@ -756,36 +774,56 @@ func (ob *EVMChainClient) checkConfirmedTx(txHash string, nonce uint64) (*ethtyp
756
774
return nil , nil , false
757
775
}
758
776
759
- // cross-check receipt against the block
760
- block , err := ob .GetBlockByNumberCached (receipt .BlockNumber .Uint64 ())
761
- if err != nil {
762
- log .Error ().Err (err ).Msgf ("confirmTxByHash: GetBlockByNumberCached error, txHash %s nonce %d block %d" ,
763
- txHash , nonce , receipt .BlockNumber )
764
- return nil , nil , false
765
- }
766
- // #nosec G701 non negative value
767
- if receipt .TransactionIndex >= uint (len (block .Transactions ())) {
768
- log .Error ().Msgf ("confirmTxByHash: transaction index %d out of range [0, %d), txHash %s nonce %d block %d" ,
769
- receipt .TransactionIndex , len (block .Transactions ()), txHash , nonce , receipt .BlockNumber )
770
- return nil , nil , false
771
- }
772
- txAtIndex := block .Transactions ()[receipt .TransactionIndex ]
773
- if txAtIndex .Hash () != transaction .Hash () {
774
- log .Error ().Msgf ("confirmTxByHash: transaction at index %d has different hash %s, txHash %s nonce %d block %d" ,
775
- receipt .TransactionIndex , txAtIndex .Hash ().Hex (), txHash , nonce , receipt .BlockNumber )
776
- return nil , nil , false
777
- }
778
-
779
777
// check confirmations
780
778
if ! ob .HasEnoughConfirmations (receipt , ob .GetLastBlockHeight ()) {
781
779
log .Debug ().Msgf ("confirmTxByHash: txHash %s nonce %d included but not confirmed: receipt block %d, current block %d" ,
782
780
txHash , nonce , receipt .BlockNumber , ob .GetLastBlockHeight ())
783
781
return nil , nil , false
784
782
}
785
783
784
+ // cross-check tx inclusion against the block
785
+ // Note: a guard for false BlockNumber in receipt. The blob-carrying tx won't come here
786
+ err = ob .checkTxInclusion (transaction , receipt .BlockNumber .Uint64 (), receipt .TransactionIndex )
787
+ if err != nil {
788
+ log .Error ().Err (err ).Msgf ("confirmTxByHash: checkTxInclusion error for txHash %s nonce %d" , txHash , nonce )
789
+ return nil , nil , false
790
+ }
791
+
786
792
return receipt , transaction , true
787
793
}
788
794
795
+ // checkTxInclusion returns nil only if tx is included in the block at blockNumber and txIndex
796
+ func (ob * EVMChainClient ) checkTxInclusion (tx * ethtypes.Transaction , blockNumber uint64 , txIndex uint ) error {
797
+ block , blockRPC , fallBack , _ , err := ob .GetBlockByNumberCached (blockNumber )
798
+ if err != nil {
799
+ return fmt .Errorf ("GetBlockByNumberCached error for block %d txHash %s nonce %d: %w" , blockNumber , tx .Hash (), tx .Nonce (), err )
800
+ }
801
+ if ! fallBack {
802
+ // #nosec G701 non negative value
803
+ if txIndex >= uint (len (block .Transactions ())) {
804
+ return fmt .Errorf ("transaction index %d out of range [0, %d), txHash %s nonce %d block %d" ,
805
+ txIndex , len (block .Transactions ()), tx .Hash (), tx .Nonce (), blockNumber )
806
+ }
807
+ txAtIndex := block .Transactions ()[txIndex ]
808
+ if txAtIndex .Hash () != tx .Hash () {
809
+ return fmt .Errorf ("transaction at index %d has different hash %s, txHash %s nonce %d block %d" ,
810
+ txIndex , txAtIndex .Hash ().Hex (), tx .Hash (), tx .Nonce (), blockNumber )
811
+ }
812
+ } else { // fell back on ETH RPC as ethclient failed to parse the block
813
+ // #nosec G701 non negative value
814
+ if txIndex >= uint (len (blockRPC .Transactions )) {
815
+ return fmt .Errorf ("transaction index %d out of range [0, %d), txHash %s nonce %d block %d" ,
816
+ txIndex , len (block .Transactions ()), tx .Hash (), tx .Nonce (), blockNumber )
817
+ }
818
+ txAtIndex := blockRPC .Transactions [txIndex ]
819
+ if ethcommon .HexToHash (txAtIndex .Hash ) != tx .Hash () {
820
+ return fmt .Errorf ("transaction at index %d has different hash %s, txHash %s nonce %d block %d" ,
821
+ txIndex , txAtIndex .Hash , tx .Hash (), tx .Nonce (), blockNumber )
822
+ }
823
+ }
824
+ return nil
825
+ }
826
+
789
827
// SetLastBlockHeightScanned set last block height scanned (not necessarily caught up with external block; could be slow/paused)
790
828
func (ob * EVMChainClient ) SetLastBlockHeightScanned (height uint64 ) {
791
829
atomic .StoreUint64 (& ob .lastBlockScanned , height )
@@ -863,21 +901,21 @@ func (ob *EVMChainClient) postBlockHeader(tip uint64) error {
863
901
return fmt .Errorf ("postBlockHeader: must post block confirmed block header: %d > %d" , bn , tip )
864
902
}
865
903
866
- block , err := ob .GetBlockByNumberCached (bn )
904
+ header , err := ob .GetBlockHeaderCached (bn )
867
905
if err != nil {
868
906
ob .logger .ExternalChainWatcher .Error ().Err (err ).Msgf ("postBlockHeader: error getting block: %d" , bn )
869
907
return err
870
908
}
871
- headerRLP , err := rlp .EncodeToBytes (block . Header () )
909
+ headerRLP , err := rlp .EncodeToBytes (header )
872
910
if err != nil {
873
911
ob .logger .ExternalChainWatcher .Error ().Err (err ).Msgf ("postBlockHeader: error encoding block header: %d" , bn )
874
912
return err
875
913
}
876
914
877
915
_ , err = ob .zetaClient .PostAddBlockHeader (
878
916
ob .chain .ChainId ,
879
- block .Hash ().Bytes (),
880
- block .Number () .Int64 (),
917
+ header .Hash ().Bytes (),
918
+ header .Number .Int64 (),
881
919
common .NewEthereumHeader (headerRLP ),
882
920
)
883
921
if err != nil {
@@ -1159,48 +1197,39 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse
1159
1197
}
1160
1198
1161
1199
// TODO: we can track the total number of 'getBlockByNumber' RPC calls made
1162
- block , err := ob .GetBlockByNumberCached (bn )
1200
+ block , blockRPC , fallBack , skip , err := ob .GetBlockByNumberCached (bn )
1163
1201
if err != nil {
1202
+ if skip {
1203
+ ob .logger .ExternalChainWatcher .Error ().Err (err ).Msgf ("observeTssRecvd: skip block %d for chain %d" , bn , ob .chain .ChainId )
1204
+ continue
1205
+ }
1164
1206
ob .logger .ExternalChainWatcher .Error ().Err (err ).Msgf ("observeTssRecvd: error getting block %d for chain %d" , bn , ob .chain .ChainId )
1165
1207
return bn - 1 // we have to re-scan from this block next time
1166
1208
}
1167
- for _ , tx := range block .Transactions () {
1168
- if tx .To () == nil {
1169
- continue
1170
- }
1171
-
1172
- if * tx .To () == tssAddress {
1173
- receipt , err := ob .evmClient .TransactionReceipt (context .Background (), tx .Hash ())
1174
- if err != nil {
1175
- ob .logger .ExternalChainWatcher .Err (err ).Msgf (
1176
- "observeTssRecvd: TransactionReceipt error for tx %s chain %d" , tx .Hash ().Hex (), ob .chain .ChainId )
1177
- return bn - 1 // we have to re-scan this block next time
1178
- }
1179
- if receipt .Status != 1 { // 1: successful, 0: failed
1180
- ob .logger .ExternalChainWatcher .Info ().Msgf ("observeTssRecvd: tx %s chain %d failed; don't act" , tx .Hash ().Hex (), ob .chain .ChainId )
1181
- continue
1182
- }
1183
-
1184
- sender , err := ob .GetTransactionSender (tx , block .Hash (), receipt .TransactionIndex )
1185
- if err != nil {
1186
- ob .logger .ExternalChainWatcher .Err (err ).Msgf (
1187
- "observeTssRecvd: GetTransactionSender error for tx %s chain %d" , tx .Hash ().Hex (), ob .chain .ChainId )
1188
- return bn - 1 // we have to re-scan this block next time
1189
- }
1190
-
1191
- msg := ob .GetInboundVoteMsgForTokenSentToTSS (tx , sender , receipt .BlockNumber .Uint64 ())
1192
- if msg == nil {
1193
- continue
1209
+ if ! fallBack {
1210
+ for _ , tx := range block .Transactions () {
1211
+ if tx .To () != nil && * tx .To () == tssAddress {
1212
+ if ok := ob .processIntxToTss (tx , bn , block .Hash ()); ! ok {
1213
+ return bn - 1 // we have to re-scan this block next time
1214
+ }
1194
1215
}
1195
- zetaHash , ballot , err := ob .zetaClient .PostVoteInbound (PostVoteInboundGasLimit , PostVoteInboundExecutionGasLimit , msg )
1196
- if err != nil {
1197
- ob .logger .ExternalChainWatcher .Error ().Err (err ).Msgf (
1198
- "observeTssRecvd: error posting to zeta core for tx %s at height %d for chain %d" , tx .Hash ().Hex (), bn , ob .chain .ChainId )
1199
- return bn - 1 // we have to re-scan this block next time
1200
- } else if zetaHash != "" {
1201
- ob .logger .ExternalChainWatcher .Info ().Msgf (
1202
- "observeTssRecvd: gas asset deposit detected in tx %s at height %d for chain %d, PostVoteInbound zeta tx: %s ballot %s" ,
1203
- tx .Hash ().Hex (), bn , ob .chain .ChainId , zetaHash , ballot )
1216
+ }
1217
+ } else { // fell back on ETH RPC as ethclient failed to parse the block
1218
+ ob .logger .ExternalChainWatcher .Info ().Msgf ("observeTssRecvd: processing block %d using fallback for chain %d" , bn , ob .chain .ChainId )
1219
+ for _ , txRPC := range blockRPC .Transactions {
1220
+ if ethcommon .HexToAddress (txRPC .To ) == tssAddress {
1221
+ tx , _ , err := ob .evmClient .TransactionByHash (context .Background (), ethcommon .HexToHash (txRPC .Hash ))
1222
+ if err != nil {
1223
+ if strings .Contains (err .Error (), "transaction type not supported" ) {
1224
+ ob .logger .ExternalChainWatcher .Err (err ).Msgf (
1225
+ "observeTssRecvd: transaction type not supported for tx %s chain %d" , txRPC .Hash , ob .chain .ChainId )
1226
+ continue // skip blob-carrying tx to TSS address
1227
+ }
1228
+ return bn - 1 // we have to re-scan this block next time
1229
+ }
1230
+ if ok := ob .processIntxToTss (tx , bn , ethcommon .HexToHash (blockRPC .Hash )); ! ok {
1231
+ return bn - 1 // we have to re-scan this block next time
1232
+ }
1204
1233
}
1205
1234
}
1206
1235
}
@@ -1209,6 +1238,48 @@ func (ob *EVMChainClient) observeTssRecvd(startBlock, toBlock uint64, flags obse
1209
1238
return toBlock
1210
1239
}
1211
1240
1241
+ // processIntxToTss processes the incoming tx to TSS address and posts to zetacore
1242
+ // returns true if the tx is successfully processed, false otherwise
1243
+ func (ob * EVMChainClient ) processIntxToTss (tx * ethtypes.Transaction , bn uint64 , blockHash ethcommon.Hash ) bool {
1244
+ receipt , err := ob .evmClient .TransactionReceipt (context .Background (), tx .Hash ())
1245
+ if err != nil {
1246
+ ob .logger .ExternalChainWatcher .Err (err ).Msgf (
1247
+ "processIntxToTss: TransactionReceipt error for tx %s chain %d" , tx .Hash ().Hex (), ob .chain .ChainId )
1248
+ return false // we have to re-scan this block next time
1249
+ }
1250
+ if receipt .Status != 1 { // 1: successful, 0: failed
1251
+ ob .logger .ExternalChainWatcher .Info ().Msgf ("processIntxToTss: tx %s chain %d failed; don't act" , tx .Hash ().Hex (), ob .chain .ChainId )
1252
+ return true // skip failed tx
1253
+ }
1254
+ if bytes .Equal (tx .Data (), []byte (DonationMessage )) {
1255
+ ob .logger .ExternalChainWatcher .Info ().Msgf (
1256
+ "processIntxToTss: thank you rich folk for your donation!: %s chain %d" , tx .Hash ().Hex (), ob .chain .ChainId )
1257
+ return true // skip donation tx
1258
+ }
1259
+ sender , err := ob .GetTransactionSender (tx , blockHash , receipt .TransactionIndex )
1260
+ if err != nil {
1261
+ ob .logger .ExternalChainWatcher .Err (err ).Msgf (
1262
+ "processIntxToTss: GetTransactionSender error for tx %s chain %d" , tx .Hash ().Hex (), ob .chain .ChainId )
1263
+ return false // we have to re-scan this block next time
1264
+ }
1265
+
1266
+ msg := ob .GetInboundVoteMsgForTokenSentToTSS (tx , sender , bn )
1267
+ if msg == nil {
1268
+ return true // should never happen, always non-nil
1269
+ }
1270
+ zetaHash , ballot , err := ob .zetaClient .PostVoteInbound (PostVoteInboundGasLimit , PostVoteInboundExecutionGasLimit , msg )
1271
+ if err != nil {
1272
+ ob .logger .ExternalChainWatcher .Error ().Err (err ).Msgf (
1273
+ "processIntxToTss: error posting to zeta core for tx %s at height %d for chain %d" , tx .Hash ().Hex (), bn , ob .chain .ChainId )
1274
+ return false // we have to re-scan this block next time
1275
+ } else if zetaHash != "" {
1276
+ ob .logger .ExternalChainWatcher .Info ().Msgf (
1277
+ "processIntxToTss: gas asset deposit detected in tx %s at height %d for chain %d, PostSend zeta tx: %s ballot %s" ,
1278
+ tx .Hash ().Hex (), bn , ob .chain .ChainId , zetaHash , ballot )
1279
+ }
1280
+ return true
1281
+ }
1282
+
1212
1283
func (ob * EVMChainClient ) WatchGasPrice () {
1213
1284
ob .logger .WatchGasPrice .Info ().Msg ("WatchGasPrice starting..." )
1214
1285
err := ob .PostGasPrice ()
@@ -1411,15 +1482,45 @@ func (ob *EVMChainClient) GetTxID(nonce uint64) string {
1411
1482
return fmt .Sprintf ("%d-%s-%d" , ob .chain .ChainId , tssAddr , nonce )
1412
1483
}
1413
1484
1414
- func (ob * EVMChainClient ) GetBlockByNumberCached (blockNumber uint64 ) (* ethtypes.Block , error ) {
1415
- if block , ok := ob .BlockCache .Get (blockNumber ); ok {
1416
- return block .(* ethtypes.Block ), nil
1485
+ func (ob * EVMChainClient ) GetBlockHeaderCached (blockNumber uint64 ) (* ethtypes.Header , error ) {
1486
+ if header , ok := ob .headerCache .Get (blockNumber ); ok {
1487
+ return header .(* ethtypes.Header ), nil
1417
1488
}
1418
- block , err := ob .evmClient .BlockByNumber (context .Background (), new (big.Int ).SetUint64 (blockNumber ))
1489
+ header , err := ob .evmClient .HeaderByNumber (context .Background (), new (big.Int ).SetUint64 (blockNumber ))
1419
1490
if err != nil {
1420
1491
return nil , err
1421
1492
}
1422
- ob .BlockCache .Add (blockNumber , block )
1423
- ob .BlockCache .Add (block .Hash (), block )
1424
- return block , nil
1493
+ ob .headerCache .Add (blockNumber , header )
1494
+ return header , nil
1495
+ }
1496
+
1497
+ // GetBlockByNumberCached get block by number from cache
1498
+ // returns block, ethrpc.Block, isFallback, isSkip, error
1499
+ func (ob * EVMChainClient ) GetBlockByNumberCached (blockNumber uint64 ) (* ethtypes.Block , * ethrpc.Block , bool , bool , error ) {
1500
+ if block , ok := ob .blockCache .Get (blockNumber ); ok {
1501
+ return block .(* ethtypes.Block ), nil , false , false , nil
1502
+ }
1503
+ if block , ok := ob .blockCacheV3 .Get (blockNumber ); ok {
1504
+ return nil , block .(* ethrpc.Block ), true , false , nil
1505
+ }
1506
+ block , err := ob .evmClient .BlockByNumber (context .Background (), new (big.Int ).SetUint64 (blockNumber ))
1507
+ if err != nil {
1508
+ if strings .Contains (err .Error (), "block header indicates no transactions" ) {
1509
+ return nil , nil , false , true , err // it's ok skip empty block
1510
+ } else if strings .Contains (err .Error (), "transaction type not supported" ) {
1511
+ if blockNumber > math .MaxInt32 {
1512
+ return nil , nil , true , false , fmt .Errorf ("block number %d is too large" , blockNumber )
1513
+ }
1514
+ // #nosec G701 always in range, checked above
1515
+ rpcBlock , err := ob .evmClientAlternate .EthGetBlockByNumber (int (blockNumber ), true )
1516
+ if err != nil {
1517
+ return nil , nil , true , false , err // fall back on ethRPC but still fail
1518
+ }
1519
+ ob .blockCacheV3 .Add (blockNumber , rpcBlock )
1520
+ return nil , rpcBlock , true , false , nil // fall back on ethRPC without error
1521
+ }
1522
+ return nil , nil , false , false , err
1523
+ }
1524
+ ob .blockCache .Add (blockNumber , block )
1525
+ return block , nil , false , false , nil
1425
1526
}
0 commit comments