diff --git a/packages/relayer/.default.env b/packages/relayer/.default.env index 3af94987915..202ac920401 100644 --- a/packages/relayer/.default.env +++ b/packages/relayer/.default.env @@ -7,4 +7,5 @@ RELAYER_ECDSA_KEY= L1_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10 L2_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10 L1_RPC_URL="wss://eth-goerli.g.alchemy.com/v2/bPAA5rQ42Zoo4ts9TYnTB2t0cuc5lf7_" -L2_RPC_URL="wss://rinkeby-light.eth.linkpool.io/ws" \ No newline at end of file +L2_RPC_URL="wss://rinkeby-light.eth.linkpool.io/ws" +CONFIRMATIONS_BEFORE_PROCESSING=15 \ No newline at end of file diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index d9a6b9dd7cf..2a77b22d864 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -25,7 +25,7 @@ linters: linters-settings: funlen: - lines: 100 + lines: 105 statements: 45 gocognit: min-complexity: 32 diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index 46cfbb19b42..8c87b4cb427 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strconv" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" @@ -32,7 +33,10 @@ var ( "MYSQL_DATABASE", "MYSQL_HOST", "RELAYER_ECDSA_KEY", + "CONFIRMATIONS_BEFORE_PROCESSING", } + + defaultConfirmations = 15 ) func Run(mode relayer.Mode, layer relayer.Layer) { @@ -107,6 +111,11 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), return nil, nil, err } + confirmations, err := strconv.Atoi(os.Getenv("CONFIRMATIONS_BEFORE_PROCESSING")) + if err != nil || confirmations <= 0 { + confirmations = defaultConfirmations + } + indexers := make([]*indexer.Service, 0) if layer == relayer.L1 || layer == relayer.Both { @@ -122,6 +131,8 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), BridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")), DestBridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")), + + Confirmations: uint64(confirmations), }) if err != nil { log.Fatal(err) @@ -143,6 +154,8 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), BridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")), DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")), + + Confirmations: uint64(confirmations), }) if err != nil { log.Fatal(err) diff --git a/packages/relayer/errors.go b/packages/relayer/errors.go index 254dc5571e4..3c0f0a83303 100644 --- a/packages/relayer/errors.go +++ b/packages/relayer/errors.go @@ -15,5 +15,10 @@ var ( ErrNoRPCClient = errors.Validation.NewWithKeyAndDetail("ERR_NO_RPC_CLIENT", "RPCClient is required") ErrNoBridge = errors.Validation.NewWithKeyAndDetail("ERR_NO_BRIDGE", "Bridge is required") ErrNoTaikoL2 = errors.Validation.NewWithKeyAndDetail("ERR_NO_TAIKO_L2", "TaikoL2 is required") + + ErrInvalidConfirmations = errors.Validation.NewWithKeyAndDetail( + "ERR_INVALID_CONFIRMATIONS", + "Confirmations amount is invalid, must be numerical and > 0", + ) ErrInvalidMode = errors.Validation.NewWithKeyAndDetail("ERR_INVALID_MODE", "Mode not supported") ) diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index 4b98e26a2e9..da60d17b312 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -53,6 +53,7 @@ type NewServiceOpts struct { BridgeAddress common.Address DestBridgeAddress common.Address DestTaikoAddress common.Address + Confirmations uint64 } func NewService(opts NewServiceOpts) (*Service, error) { @@ -130,6 +131,8 @@ func NewService(opts NewServiceOpts) (*Service, error) { DestBridge: destBridge, EventRepo: opts.EventRepo, DestHeaderSyncer: destHeaderSyncer, + Confirmations: opts.Confirmations, + SrcETHClient: opts.EthClient, }) if err != nil { return nil, errors.Wrap(err, "message.NewProcessor") diff --git a/packages/relayer/indexer/service_test.go b/packages/relayer/indexer/service_test.go index a9437fe372d..8f6b22bcef1 100644 --- a/packages/relayer/indexer/service_test.go +++ b/packages/relayer/indexer/service_test.go @@ -40,6 +40,7 @@ func Test_NewService(t *testing.T) { ECDSAKey: dummyEcdsaKey, BridgeAddress: common.HexToAddress(dummyAddress), DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, nil, }, @@ -53,6 +54,7 @@ func Test_NewService(t *testing.T) { ECDSAKey: dummyEcdsaKey, BridgeAddress: common.HexToAddress(dummyAddress), DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoRPCClient, }, @@ -66,6 +68,7 @@ func Test_NewService(t *testing.T) { ECDSAKey: dummyEcdsaKey, RPCClient: &rpc.Client{}, DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoBridgeAddress, }, @@ -79,6 +82,7 @@ func Test_NewService(t *testing.T) { ECDSAKey: dummyEcdsaKey, RPCClient: &rpc.Client{}, BridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoBridgeAddress, }, @@ -92,6 +96,7 @@ func Test_NewService(t *testing.T) { DestEthClient: ðclient.Client{}, BridgeAddress: common.HexToAddress(dummyAddress), DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoECDSAKey, }, @@ -105,6 +110,7 @@ func Test_NewService(t *testing.T) { BridgeAddress: common.HexToAddress(dummyAddress), RPCClient: &rpc.Client{}, DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoEventRepository, }, @@ -118,6 +124,7 @@ func Test_NewService(t *testing.T) { DestEthClient: ðclient.Client{}, BridgeAddress: common.HexToAddress(dummyAddress), DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoBlockRepository, }, @@ -131,6 +138,7 @@ func Test_NewService(t *testing.T) { DestEthClient: ðclient.Client{}, BridgeAddress: common.HexToAddress(dummyAddress), DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoEthClient, }, @@ -144,6 +152,7 @@ func Test_NewService(t *testing.T) { RPCClient: &rpc.Client{}, BridgeAddress: common.HexToAddress(dummyAddress), DestBridgeAddress: common.HexToAddress(dummyAddress), + Confirmations: 1, }, relayer.ErrNoEthClient, }, diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 5b895258b6d..160244b9f99 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "math/big" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -29,6 +30,10 @@ func (p *Processor) ProcessMessage( return errors.New("only user can process this, gasLimit set to 0") } + if err := p.waitForConfirmations(ctx, event.Raw.TxHash, event.Raw.BlockNumber); err != nil { + return errors.Wrap(err, "p.waitForConfirmations") + } + // get latest synced header since not every header is synced from L1 => L2, // and later blocks still have the storage trie proof from previous blocks. latestSyncedHeader, err := p.destHeaderSyncer.GetLatestSyncedHeader(&bind.CallOpts{}) @@ -116,3 +121,21 @@ func (p *Processor) ProcessMessage( return nil } + +func (p *Processor) waitForConfirmations(ctx context.Context, txHash common.Hash, blockNumber uint64) error { + // TODO: make timeout a config var + ctx, cancelFunc := context.WithTimeout(ctx, 2*time.Minute) + + defer cancelFunc() + + if err := relayer.WaitConfirmations( + ctx, + p.srcEthClient, + p.confirmations, + txHash, + ); err != nil { + return errors.Wrap(err, "relayer.WaitConfirmations") + } + + return nil +} diff --git a/packages/relayer/message/processor.go b/packages/relayer/message/processor.go index 65ad44fff17..34ef0d0f192 100644 --- a/packages/relayer/message/processor.go +++ b/packages/relayer/message/processor.go @@ -13,6 +13,7 @@ import ( type Processor struct { eventRepo relayer.EventRepository + srcEthClient *ethclient.Client destEthClient *ethclient.Client rpc *rpc.Client ecdsaKey *ecdsa.PrivateKey @@ -21,16 +22,20 @@ type Processor struct { destHeaderSyncer *contracts.IHeaderSync prover *proof.Prover + + confirmations uint64 } type NewProcessorOpts struct { Prover *proof.Prover ECDSAKey *ecdsa.PrivateKey RPCClient *rpc.Client + SrcETHClient *ethclient.Client DestETHClient *ethclient.Client DestBridge *contracts.Bridge EventRepo relayer.EventRepository DestHeaderSyncer *contracts.IHeaderSync + Confirmations uint64 } func NewProcessor(opts NewProcessorOpts) (*Processor, error) { @@ -50,6 +55,10 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) { return nil, relayer.ErrNoEthClient } + if opts.SrcETHClient == nil { + return nil, relayer.ErrNoEthClient + } + if opts.DestBridge == nil { return nil, relayer.ErrNoBridge } @@ -62,13 +71,22 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) { return nil, relayer.ErrNoTaikoL2 } + if opts.Confirmations == 0 { + return nil, relayer.ErrInvalidConfirmations + } + return &Processor{ - eventRepo: opts.EventRepo, - prover: opts.Prover, - ecdsaKey: opts.ECDSAKey, - rpc: opts.RPCClient, + eventRepo: opts.EventRepo, + prover: opts.Prover, + ecdsaKey: opts.ECDSAKey, + rpc: opts.RPCClient, + + srcEthClient: opts.SrcETHClient, + destEthClient: opts.DestETHClient, destBridge: opts.DestBridge, destHeaderSyncer: opts.DestHeaderSyncer, + + confirmations: opts.Confirmations, }, nil } diff --git a/packages/relayer/message/processor_test.go b/packages/relayer/message/processor_test.go index 53a695cd882..ee833270f24 100644 --- a/packages/relayer/message/processor_test.go +++ b/packages/relayer/message/processor_test.go @@ -25,21 +25,53 @@ func Test_NewProcessor(t *testing.T) { Prover: &proof.Prover{}, ECDSAKey: &ecdsa.PrivateKey{}, RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, DestETHClient: ðclient.Client{}, DestBridge: &contracts.Bridge{}, EventRepo: &repo.EventRepository{}, DestHeaderSyncer: &contracts.IHeaderSync{}, + Confirmations: 1, }, nil, }, + { + "errNoConfirmations", + NewProcessorOpts{ + Prover: &proof.Prover{}, + ECDSAKey: &ecdsa.PrivateKey{}, + RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, + DestETHClient: ðclient.Client{}, + DestBridge: &contracts.Bridge{}, + EventRepo: &repo.EventRepository{}, + DestHeaderSyncer: &contracts.IHeaderSync{}, + }, + relayer.ErrInvalidConfirmations, + }, + { + "errNoSrcClient", + NewProcessorOpts{ + Prover: &proof.Prover{}, + ECDSAKey: &ecdsa.PrivateKey{}, + RPCClient: &rpc.Client{}, + DestETHClient: ðclient.Client{}, + DestBridge: &contracts.Bridge{}, + EventRepo: &repo.EventRepository{}, + DestHeaderSyncer: &contracts.IHeaderSync{}, + Confirmations: 1, + }, + relayer.ErrNoEthClient, + }, { "errNoProver", NewProcessorOpts{ ECDSAKey: &ecdsa.PrivateKey{}, RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, DestETHClient: ðclient.Client{}, DestBridge: &contracts.Bridge{}, EventRepo: &repo.EventRepository{}, + Confirmations: 1, DestHeaderSyncer: &contracts.IHeaderSync{}, }, relayer.ErrNoProver, @@ -50,10 +82,12 @@ func Test_NewProcessor(t *testing.T) { Prover: &proof.Prover{}, RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, DestETHClient: ðclient.Client{}, DestBridge: &contracts.Bridge{}, EventRepo: &repo.EventRepository{}, DestHeaderSyncer: &contracts.IHeaderSync{}, + Confirmations: 1, }, relayer.ErrNoECDSAKey, }, @@ -62,10 +96,12 @@ func Test_NewProcessor(t *testing.T) { NewProcessorOpts{ Prover: &proof.Prover{}, ECDSAKey: &ecdsa.PrivateKey{}, + SrcETHClient: ðclient.Client{}, DestETHClient: ðclient.Client{}, DestBridge: &contracts.Bridge{}, EventRepo: &repo.EventRepository{}, DestHeaderSyncer: &contracts.IHeaderSync{}, + Confirmations: 1, }, relayer.ErrNoRPCClient, }, @@ -75,9 +111,11 @@ func Test_NewProcessor(t *testing.T) { Prover: &proof.Prover{}, ECDSAKey: &ecdsa.PrivateKey{}, RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, DestBridge: &contracts.Bridge{}, EventRepo: &repo.EventRepository{}, DestHeaderSyncer: &contracts.IHeaderSync{}, + Confirmations: 1, }, relayer.ErrNoEthClient, }, @@ -87,9 +125,11 @@ func Test_NewProcessor(t *testing.T) { Prover: &proof.Prover{}, ECDSAKey: &ecdsa.PrivateKey{}, RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, DestETHClient: ðclient.Client{}, EventRepo: &repo.EventRepository{}, DestHeaderSyncer: &contracts.IHeaderSync{}, + Confirmations: 1, }, relayer.ErrNoBridge, }, @@ -99,9 +139,11 @@ func Test_NewProcessor(t *testing.T) { Prover: &proof.Prover{}, ECDSAKey: &ecdsa.PrivateKey{}, RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, DestETHClient: ðclient.Client{}, DestBridge: &contracts.Bridge{}, DestHeaderSyncer: &contracts.IHeaderSync{}, + Confirmations: 1, }, relayer.ErrNoEventRepository, }, @@ -111,9 +153,11 @@ func Test_NewProcessor(t *testing.T) { Prover: &proof.Prover{}, ECDSAKey: &ecdsa.PrivateKey{}, RPCClient: &rpc.Client{}, + SrcETHClient: ðclient.Client{}, DestETHClient: ðclient.Client{}, EventRepo: &repo.EventRepository{}, DestBridge: &contracts.Bridge{}, + Confirmations: 1, }, relayer.ErrNoTaikoL2, }, diff --git a/packages/relayer/types.go b/packages/relayer/types.go index a25a9e5492d..2bf6f601e7b 100644 --- a/packages/relayer/types.go +++ b/packages/relayer/types.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" @@ -49,3 +50,37 @@ func WaitReceipt(ctx context.Context, client *ethclient.Client, tx *types.Transa } } } + +// WaitConfirmations won't return before N blocks confirmations have been seen +// on destination chain. +func WaitConfirmations(ctx context.Context, client *ethclient.Client, confirmations uint64, txHash common.Hash) error { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + receipt, err := client.TransactionReceipt(ctx, txHash) + if err != nil { + if err == ethereum.NotFound { + continue + } + + return err + } + + latest, err := client.BlockNumber(ctx) + if err != nil { + return err + } + + if latest < receipt.BlockNumber.Uint64()+confirmations { + continue + } + + return nil + } + } +}