From 2c07f05ab9f74980e53b994c2734bfd046619249 Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Tue, 22 Aug 2023 07:42:47 +0545 Subject: [PATCH 01/14] fix: add missing mutex in query proof --- relayer/chains/wasm/query.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 44176237f..c12228f17 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -334,12 +334,12 @@ func (ap *WasmProvider) QueryClientConsensusState(ctx context.Context, chainHeig } func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (*wasmtypes.QuerySmartContractStateResponse, error) { - done := ap.SetSDKContext() - defer done() + done := ap.SetSDKContext() + defer done() return ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{ - Address: ap.PCfg.IbcHandlerAddress, - QueryData: param, - }) + Address: ap.PCfg.IbcHandlerAddress, + QueryData: param, + }) } func (ap *WasmProvider) QueryIBCHandlerContractProcessed(ctx context.Context, param wasmtypes.RawContractMessage) ([]byte, error) { @@ -492,6 +492,8 @@ func (ap *WasmProvider) QueryConnection(ctx context.Context, height int64, conne } func (ap *WasmProvider) QueryWasmProof(ctx context.Context, storageKey []byte, height int64) ([]byte, error) { + done := ap.SetSDKContext() + done() ibcAddr, err := sdk.AccAddressFromBech32(ap.PCfg.IbcHandlerAddress) if err != nil { return nil, err From 244d363e90787b9ca6348c4d4853c6d0b2b2972a Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Tue, 22 Aug 2023 07:43:20 +0545 Subject: [PATCH 02/14] chore: add retry logic in wasm query method --- relayer/chains/wasm/query.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index c12228f17..33e59f49b 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -11,6 +11,7 @@ import ( "time" wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types" + "github.com/avast/retry-go/v4" abci "github.com/cometbft/cometbft/abci/types" rpcclient "github.com/cometbft/cometbft/rpc/client" tmtypes "github.com/cometbft/cometbft/types" @@ -19,6 +20,7 @@ import ( "github.com/cosmos/gogoproto/proto" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/icon-project/IBC-Integration/libraries/go/common/icon" + "go.uber.org/zap" querytypes "github.com/cosmos/cosmos-sdk/types/query" bankTypes "github.com/cosmos/cosmos-sdk/x/bank/types" @@ -333,13 +335,25 @@ func (ap *WasmProvider) QueryClientConsensusState(ctx context.Context, chainHeig return clienttypes.NewQueryConsensusStateResponse(anyConsensusState, nil, clienttypes.NewHeight(0, uint64(chainHeight))), nil } -func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (*wasmtypes.QuerySmartContractStateResponse, error) { +func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (op *wasmtypes.QuerySmartContractStateResponse, err error) { + return op, retry.Do(func() error { done := ap.SetSDKContext() defer done() - return ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{ + op, err = ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{ Address: ap.PCfg.IbcHandlerAddress, QueryData: param, }) + return err + }, retry.Context(ctx), retry.Attempts(latestHeightQueryRetries), retry.Delay(50*time.Millisecond), retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) { + ap.log.Error( + "Failed to query", + zap.Uint("attempt", n+1), + zap.Uint("max_attempts", latestHeightQueryRetries), + zap.Any("Param", param), + zap.Error(err), + ) + })) + } func (ap *WasmProvider) QueryIBCHandlerContractProcessed(ctx context.Context, param wasmtypes.RawContractMessage) ([]byte, error) { From fb281b4503ede6c3bb5c31d58b0139681560cf0c Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Tue, 22 Aug 2023 07:44:18 +0545 Subject: [PATCH 03/14] fix: add delay for pathprocessor sync --- relayer/chains/icon/icon_chain_processor.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/relayer/chains/icon/icon_chain_processor.go b/relayer/chains/icon/icon_chain_processor.go index 514e201f9..51b5dba00 100644 --- a/relayer/chains/icon/icon_chain_processor.go +++ b/relayer/chains/icon/icon_chain_processor.go @@ -285,7 +285,6 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer return err } } - // } icp.log.Info("Start to query from height", zap.Int64("height", processedheight)) // subscribe to monitor block @@ -297,7 +296,7 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer icp.firstTime = true blockReq := &types.BlockRequest{ - Height: types.NewHexInt(int64(icp.chainProvider.PCfg.StartHeight)), + Height: types.NewHexInt(int64(icp.StartFromHeight(ctx))), EventFilters: GetMonitorEventFilters(icp.chainProvider.PCfg.IbcHandlerAddress), } @@ -379,6 +378,9 @@ loop: break } time.Sleep(10 * time.Millisecond) + if icp.firstTime { + time.Sleep(4000 * time.Millisecond) + } icp.firstTime = false if br = nil; len(btpBlockRespCh) > 0 { br = <-btpBlockRespCh @@ -400,7 +402,7 @@ loop: if err != nil { return err } else if height != processedheight+i { - icp.log.Warn("Reconnect: missing block notification ", + icp.log.Warn("Reconnect: missing block notification", zap.Int64("got", height), zap.Int64("expected", processedheight+i), ) @@ -697,6 +699,7 @@ func (icp *IconChainProcessor) clientState(ctx context.Context, clientID string) if state, ok := icp.latestClientState[clientID]; ok { return state, nil } + cs, err := icp.chainProvider.QueryClientStateWithoutProof(ctx, int64(icp.latestBlock.Height), clientID) if err != nil { return provider.ClientState{}, err From 5c72d1923a4f631660dc0adf35e30e60bc796aeb Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Tue, 22 Aug 2023 07:45:05 +0545 Subject: [PATCH 04/14] fix: remove tp when getting client state --- relayer/chains/wasm/wasm_chain_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 172185bc6..d6165899d 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -176,7 +176,7 @@ func (ccp *WasmChainProcessor) nodeStatusWithRetry(ctx context.Context) (status // clientState will return the most recent client state if client messages // have already been observed for the clientID, otherwise it will query for it. func (ccp *WasmChainProcessor) clientState(ctx context.Context, clientID string) (provider.ClientState, error) { - if state, ok := ccp.latestClientState[clientID]; ok && state.TrustingPeriod > 0 { + if state, ok := ccp.latestClientState[clientID]; ok { return state, nil } cs, err := ccp.chainProvider.QueryClientState(ctx, int64(ccp.latestBlock.Height), clientID) From df20a84ed92872fbc65cd43cc1ee0b5b6c2dd1da Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Tue, 22 Aug 2023 10:16:49 +0545 Subject: [PATCH 05/14] fix: adding defer in queryWasmProof --- relayer/chains/wasm/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 33e59f49b..87cf03b3a 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -507,7 +507,7 @@ func (ap *WasmProvider) QueryConnection(ctx context.Context, height int64, conne func (ap *WasmProvider) QueryWasmProof(ctx context.Context, storageKey []byte, height int64) ([]byte, error) { done := ap.SetSDKContext() - done() + defer done() ibcAddr, err := sdk.AccAddressFromBech32(ap.PCfg.IbcHandlerAddress) if err != nil { return nil, err From afbcf3033230560599ab7ece93cd3ca29da2bd3b Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Tue, 22 Aug 2023 10:51:48 +0545 Subject: [PATCH 06/14] fix: change to processheight --- relayer/chains/icon/icon_chain_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/chains/icon/icon_chain_processor.go b/relayer/chains/icon/icon_chain_processor.go index 51b5dba00..e8f6a1e72 100644 --- a/relayer/chains/icon/icon_chain_processor.go +++ b/relayer/chains/icon/icon_chain_processor.go @@ -296,7 +296,7 @@ func (icp *IconChainProcessor) monitoring(ctx context.Context, persistence *quer icp.firstTime = true blockReq := &types.BlockRequest{ - Height: types.NewHexInt(int64(icp.StartFromHeight(ctx))), + Height: types.NewHexInt(int64(processedheight)), EventFilters: GetMonitorEventFilters(icp.chainProvider.PCfg.IbcHandlerAddress), } From f581195f56a0c9a5d9c07dbc78970dcc3ee4ae3e Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Tue, 22 Aug 2023 11:04:26 +0545 Subject: [PATCH 07/14] fix: limit max block fetch at a time in wasm processor --- relayer/chains/icon/provider.go | 2 +- relayer/chains/wasm/wasm_chain_processor.go | 12 ++++++++++-- relayer/common/const.go | 2 +- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/relayer/chains/icon/provider.go b/relayer/chains/icon/provider.go index 9d1ad4508..031453455 100644 --- a/relayer/chains/icon/provider.go +++ b/relayer/chains/icon/provider.go @@ -225,7 +225,7 @@ func (icp *IconProvider) NewClientState( return nil, fmt.Errorf("Blockinterval cannot be empty in Icon config") } - trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(common.NanosecondRatio)) + trustingBlockPeriod := uint64(dstTrustingPeriod) / (icp.PCfg.BlockInterval * uint64(common.NanoToMilliRatio)) return &icon.ClientState{ // In case of Icon: Trusting Period is block Difference // see: light.proto in ibc-integration diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index d6165899d..883099d87 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -90,6 +90,7 @@ const ( defaultMinQueryLoopDuration = 1 * time.Second defaultBalanceUpdateWaitDuration = 60 * time.Second inSyncNumBlocksThreshold = 2 + MaxBlockFetch = 100 ) // latestClientState is a map of clientID to the latest clientInfo for that client. @@ -221,7 +222,7 @@ func (ccp *WasmChainProcessor) StartFromHeight(ctx context.Context) int { func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error { // this will be used for persistence across query cycle loop executions persistence := queryCyclePersistence{ - minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanosecondRatio)), + minQueryLoopDuration: time.Duration(ccp.chainProvider.PCfg.BlockInterval * uint64(common.NanoToMilliRatio)), lastBalanceUpdate: time.Unix(0, 0), balanceUpdateWaitDuration: defaultBalanceUpdateWaitDuration, } @@ -410,7 +411,14 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer chainID := ccp.chainProvider.ChainId() var latestHeader provider.IBCHeader - for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ { + syncUpHeight := func() int64 { + if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch { + return persistence.latestQueriedBlock + MaxBlockFetch + } + return persistence.latestHeight + } + + for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ { var eg errgroup.Group var blockRes *ctypes.ResultBlockResults var lightBlock *types.LightBlock diff --git a/relayer/common/const.go b/relayer/common/const.go index 596e9630b..55dee1afc 100644 --- a/relayer/common/const.go +++ b/relayer/common/const.go @@ -15,7 +15,7 @@ var ( ConnectionKey = "connection" ChannelKey = "channel" ONE_HOUR = 60 * 60 * 1000 - NanosecondRatio = 1000_000 + NanoToMilliRatio = 1000_000 ) var ( From a2b9187b4989000b046cafc8722a16bfdf97cd0f Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Wed, 23 Aug 2023 10:46:58 +0545 Subject: [PATCH 08/14] fix: btp block update miss --- relayer/chains/icon/icon_chain_processor.go | 1 - relayer/chains/icon/provider_test.go | 4 +- relayer/chains/wasm/tx.go | 2 - relayer/chains/wasm/wasm_chain_processor.go | 1 + relayer/processor/message_processor.go | 67 +++++++++++--------- relayer/processor/path_end_runtime.go | 13 ++-- relayer/processor/path_processor_internal.go | 46 ++++++++++++++ relayer/processor/types.go | 46 ++++++++++++++ relayer/processor/utils.go | 4 ++ 9 files changed, 143 insertions(+), 41 deletions(-) diff --git a/relayer/chains/icon/icon_chain_processor.go b/relayer/chains/icon/icon_chain_processor.go index e8f6a1e72..ceba82f72 100644 --- a/relayer/chains/icon/icon_chain_processor.go +++ b/relayer/chains/icon/icon_chain_processor.go @@ -657,7 +657,6 @@ func (icp *IconChainProcessor) handleBTPBlockRequest( } request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight)) request.response.IsProcessed = processed - } func (icp *IconChainProcessor) handlePathProcessorUpdate(ctx context.Context, diff --git a/relayer/chains/icon/provider_test.go b/relayer/chains/icon/provider_test.go index 7544bc7df..c52e7e83a 100644 --- a/relayer/chains/icon/provider_test.go +++ b/relayer/chains/icon/provider_test.go @@ -475,5 +475,5 @@ func TestHash(t *testing.T) { // isValid, err := VerifyBtpProof(decision, signedHeader.Signatures, proofContext) // assert.NoError(t, err) -// assert.True(t, isValid) -// } +// assert.True(t, isValid) +// } diff --git a/relayer/chains/wasm/tx.go b/relayer/chains/wasm/tx.go index 00692f19b..f1bdafdf6 100644 --- a/relayer/chains/wasm/tx.go +++ b/relayer/chains/wasm/tx.go @@ -1262,8 +1262,6 @@ func (cc *WasmProvider) QueryABCI(ctx context.Context, req abci.RequestQuery) (a func (cc *WasmProvider) handleAccountSequenceMismatchError(err error) { clientCtx := cc.ClientContext() - fmt.Println("client context is ", clientCtx.GetFromAddress()) - _, seq, err := cc.ClientCtx.AccountRetriever.GetAccountNumberSequence(clientCtx, clientCtx.GetFromAddress()) // sequences := numRegex.FindAllString(err.Error(), -1) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 883099d87..8cccfb723 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -123,6 +123,7 @@ func (l latestClientState) update(ctx context.Context, clientInfo clientInfo, cc // update latest if no existing state or provided consensus height is newer l[clientInfo.clientID] = clientState + } // Provider returns the ChainProvider, which provides the methods for querying, assembling IBC messages, and sending transactions. diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 63102b95f..4ad75921a 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -104,26 +104,18 @@ func (mp *messageProcessor) processMessages( // Otherwise, it will be attempted if either 2/3 of the trusting period // or the configured client update threshold duration has passed. func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst *pathEndRuntime) (bool, error) { - var err error // handle if dst is IconLightClient if IsBTPLightClient(dst.clientState) { - - // if the latestblock is less than clientState height - if dst.clientState.ConsensusHeight.RevisionHeight >= src.latestBlock.Height { + if src.BTPHeightQueue.Size() == 0 { return false, nil } - header, found := src.ibcHeaderCache[src.latestBlock.Height] - if !found { - header, err = src.chainProvider.QueryIBCHeader(ctx, int64(src.latestBlock.Height)) - if err != nil { - return false, err - } - } - if header.IsCompleteBlock() { - return true, nil + btpHeightInfo := src.BTPHeightQueue.MustGetQueue() + + if btpHeightInfo.IsProcessing { + return false, nil } - return false, nil + return true, nil } // for lightClient other than ICON this will be helpful @@ -313,15 +305,25 @@ func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, sr clientID := dst.info.ClientID latestConsensusHeight := dst.clientState.ConsensusHeight + if src.BTPHeightQueue.Size() == 0 { + return nil + } + btpHeightInfo := src.BTPHeightQueue.MustGetQueue() + if !shouldUpdate { return nil } - if !src.latestHeader.IsCompleteBlock() { + header, err := src.chainProvider.QueryIBCHeader(ctx, btpHeightInfo.Height) + if err != nil { + return fmt.Errorf("Failed to query header for height %d", btpHeightInfo.Height) + } + + if !header.IsCompleteBlock() { return fmt.Errorf("Should Update is true but the Header is incomplete") } - if src.latestHeader.Height() <= latestConsensusHeight.RevisionHeight { + if header.Height() <= latestConsensusHeight.RevisionHeight { mp.log.Debug("Src latest header is less then latest client State", zap.String("chain-id", src.info.ChainID), zap.Int64("latest-header-height", int64(src.latestHeader.Height())), @@ -331,7 +333,7 @@ func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, sr } msgUpdateClientHeader, err := src.chainProvider.MsgUpdateClientHeader( - src.latestHeader, + header, latestConsensusHeight, dst.clientTrustedState.IBCHeader, ) @@ -348,18 +350,6 @@ func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, sr return nil } -func (mp *messageProcessor) findNextIBCHeader(ctx context.Context, src, dst *pathEndRuntime) (provider.IBCHeader, error) { - clientConsensusHeight := dst.clientState.ConsensusHeight - if IsBTPLightClient(dst.clientState) { - header, found := nextIconIBCHeader(src.ibcHeaderCache.Clone(), dst.lastClientUpdateHeight) - if !found { - return nil, fmt.Errorf("unable to find Icon IBC header for Next height of %d ", clientConsensusHeight.RevisionHeight) - } - return header, nil - } - return src.chainProvider.QueryIBCHeader(ctx, int64(clientConsensusHeight.RevisionHeight+1)) -} - // trackAndSendMessages will increment attempt counters for each message and send each message. // Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error // in a previous batch. @@ -414,10 +404,27 @@ func (mp *messageProcessor) sendClientUpdate( dst.lastClientUpdateHeightMu.Lock() dst.lastClientUpdateHeight = dst.latestBlock.Height dst.lastClientUpdateHeightMu.Unlock() + // if IsBTPLightClient(dst.clientState) { + // dst.lastClientUpdateHeightMu.Lock() + // dst.lastClientUpdateHeight = uint64(dst.BTPHeightQueue.MustGetQueue().Height) + // dst.lastClientUpdateHeightMu.Unlock() + // } msgs := []provider.RelayerMessage{mp.msgUpdateClient} - if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, nil); err != nil { + callback := func(rtr *provider.RelayerTxResponse, err error) { + + if IsBTPLightClient(dst.clientState) { + if rtr.Code == 0 { + src.BTPHeightQueue.Dequeue() + return + } + NewBlockInfoHeightQueue().MustGetQueue() + src.BTPHeightQueue.ReplaceQueue(0, BlockInfoHeight{Height: int64(dst.lastClientUpdateHeight), IsProcessing: false}) + } + } + + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callback); err != nil { mp.log.Error("Error sending client update message", zap.String("path_name", src.info.PathName), zap.String("src_chain_id", src.info.ChainID), diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index c81e2c19f..39601f781 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -54,7 +54,8 @@ type pathEndRuntime struct { lastClientUpdateHeight uint64 lastClientUpdateHeightMu sync.Mutex - metrics *PrometheusMetrics + metrics *PrometheusMetrics + BTPHeightQueue *Queue[BlockInfoHeight] } func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetrics) *pathEndRuntime { @@ -76,6 +77,7 @@ func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetr clientICQProcessing: make(clientICQProcessingCache), connSubscribers: make(map[string][]func(provider.ConnectionInfo)), metrics: metrics, + BTPHeightQueue: NewBlockInfoHeightQueue(), } } @@ -385,15 +387,14 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func() pathEnd.latestBlock = d.LatestBlock pathEnd.latestBlockMu.Unlock() - if d.IsGenesis { - pathEnd.lastClientUpdateHeightMu.Lock() - pathEnd.lastClientUpdateHeight = d.LatestBlock.Height - pathEnd.lastClientUpdateHeightMu.Unlock() - } pathEnd.inSync = d.InSync pathEnd.latestHeader = d.LatestHeader pathEnd.clientState = d.ClientState + if pathEnd.chainProvider.Type() == common.IconModule && d.LatestHeader.IsCompleteBlock() { + pathEnd.BTPHeightQueue.Enqueue(BlockInfoHeight{Height: int64(d.LatestHeader.Height()), IsProcessing: false}) + } + terminate, err := pathEnd.checkForMisbehaviour(ctx, pathEnd.clientState, counterParty) if err != nil { pathEnd.log.Error( diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index b5c55cd73..20cbc7a44 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -8,6 +8,7 @@ import ( "sort" "sync" + clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" @@ -938,6 +939,10 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( pp.updateClientTrustedState(pp.pathEnd1, pp.pathEnd2) pp.updateClientTrustedState(pp.pathEnd2, pp.pathEnd1) + //for btp updateClient steps + pp.UpdateBTPHeight(ctx, pp.pathEnd1, pp.pathEnd2) + pp.UpdateBTPHeight(ctx, pp.pathEnd2, pp.pathEnd1) + channelPairs := pp.channelPairs() pp.queuePreInitMessages(cancel) @@ -1542,3 +1547,44 @@ func (pp *PathProcessor) shouldTerminateForFlushComplete() bool { pp.log.Info("Found termination condition for flush, all caches cleared") return true } + +func (pp *PathProcessor) UpdateBTPHeight(ctx context.Context, src *pathEndRuntime, dst *pathEndRuntime) { + srcIsIcon := src.chainProvider.Type() == common.IconModule + dstIsBtpClient := IsBTPLightClient(dst.clientState) + + fmt.Println(dst.clientState.ConsensusHeight.RevisionHeight) + + if !srcIsIcon && !dstIsBtpClient { + return + } + + if srcIsIcon && !dstIsBtpClient || !srcIsIcon && dstIsBtpClient { + pp.log.Error("Src Icon module mismatch with dst btp client", + zap.String("Src Chain Type ", src.chainProvider.Type()), + zap.String("Dst client Id", dst.clientState.ClientID), + ) + return + } + + if src.BTPHeightQueue.Size() == 0 { + return + } + for src.BTPHeightQueue.Size() > 0 { + btpHeightInfo := src.BTPHeightQueue.MustGetQueue() + + if dst.clientState.ConsensusHeight.RevisionHeight < uint64(btpHeightInfo.Height) { + break + } + if dst.clientState.ConsensusHeight.RevisionHeight == uint64(btpHeightInfo.Height) { + src.BTPHeightQueue.Dequeue() + continue + } + if dst.clientState.ConsensusHeight.RevisionHeight > uint64(btpHeightInfo.Height) { + cs, err := dst.chainProvider.QueryClientConsensusState(ctx, int64(dst.latestBlock.Height), dst.clientState.ClientID, clienttypes.NewHeight(0, uint64(btpHeightInfo.Height))) + if err == nil && cs != nil { + // removing latest height element + src.BTPHeightQueue.Dequeue() + } + } + } +} diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 331c6a70f..df13a5c0a 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -592,3 +592,49 @@ func ConnectionInfoConnectionKey(info provider.ConnectionInfo) ConnectionKey { CounterpartyConnID: info.CounterpartyConnID, } } + +type BlockInfoHeight struct { + Height int64 + IsProcessing bool +} + +type Queue[T any] struct { + items []T +} + +func (q *Queue[T]) Enqueue(item T) { + q.items = append(q.items, item) +} + +func (q *Queue[T]) MustGetQueue() T { + if q.Size() == 0 { + var element T + return element + } + item := q.items[0] + return item +} + +func (q *Queue[T]) ReplaceQueue(index int, element T) { + if q.Size() > index { + q.items[index] = element + } +} + +func (q *Queue[T]) Dequeue() (T, error) { + if q.Size() == 0 { + var element T + return element, fmt.Errorf("all element dequed") + } + item := q.items[0] + q.items = q.items[1:] + return item, nil +} + +func (q *Queue[T]) Size() int { + return len(q.items) +} + +func NewBlockInfoHeightQueue() *Queue[BlockInfoHeight] { + return &Queue[BlockInfoHeight]{} +} diff --git a/relayer/processor/utils.go b/relayer/processor/utils.go index 21f1d038a..1837546f9 100644 --- a/relayer/processor/utils.go +++ b/relayer/processor/utils.go @@ -53,3 +53,7 @@ func nextIconIBCHeader(heightMap IBCHeaderCache, height uint64) (provider.IBCHea header, ok := heightMap[nextHeight] return header, ok } + +func FindConsensusHeightFromEventLog([]provider.RelayerEvent) int64 { + return 0 +} From 9c3d3bd2b990bba6e0c0f1a561a678c8d05e085a Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Wed, 23 Aug 2023 11:12:09 +0545 Subject: [PATCH 09/14] fix:add values --- relayer/processor/message_processor.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 4ad75921a..ae0a0f1f8 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -404,11 +404,11 @@ func (mp *messageProcessor) sendClientUpdate( dst.lastClientUpdateHeightMu.Lock() dst.lastClientUpdateHeight = dst.latestBlock.Height dst.lastClientUpdateHeightMu.Unlock() - // if IsBTPLightClient(dst.clientState) { - // dst.lastClientUpdateHeightMu.Lock() - // dst.lastClientUpdateHeight = uint64(dst.BTPHeightQueue.MustGetQueue().Height) - // dst.lastClientUpdateHeightMu.Unlock() - // } + if IsBTPLightClient(dst.clientState) { + dst.lastClientUpdateHeightMu.Lock() + dst.lastClientUpdateHeight = uint64(dst.BTPHeightQueue.MustGetQueue().Height) + dst.lastClientUpdateHeightMu.Unlock() + } msgs := []provider.RelayerMessage{mp.msgUpdateClient} @@ -419,8 +419,11 @@ func (mp *messageProcessor) sendClientUpdate( src.BTPHeightQueue.Dequeue() return } - NewBlockInfoHeightQueue().MustGetQueue() - src.BTPHeightQueue.ReplaceQueue(0, BlockInfoHeight{Height: int64(dst.lastClientUpdateHeight), IsProcessing: false}) + + blockHeightInfo := NewBlockInfoHeightQueue().MustGetQueue() + if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { + src.BTPHeightQueue.ReplaceQueue(0, BlockInfoHeight{Height: int64(dst.lastClientUpdateHeight), IsProcessing: false}) + } } } From 4660bc8dcaaec7a285dd95ed31ae3b9ada87581b Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Wed, 23 Aug 2023 11:19:43 +0545 Subject: [PATCH 10/14] fix: the logic --- relayer/processor/message_processor.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index ae0a0f1f8..35a4986ec 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -415,12 +415,17 @@ func (mp *messageProcessor) sendClientUpdate( callback := func(rtr *provider.RelayerTxResponse, err error) { if IsBTPLightClient(dst.clientState) { + if dst.BTPHeightQueue.Size() == 0 { + return + } + blockHeightInfo := dst.BTPHeightQueue.MustGetQueue() if rtr.Code == 0 { - src.BTPHeightQueue.Dequeue() + if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { + src.BTPHeightQueue.Dequeue() + } return } - - blockHeightInfo := NewBlockInfoHeightQueue().MustGetQueue() + // this would represent a failure case in that case isProcessing should be false if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { src.BTPHeightQueue.ReplaceQueue(0, BlockInfoHeight{Height: int64(dst.lastClientUpdateHeight), IsProcessing: false}) } From 95396e6c4104171f63ddc378f77e05da06ac4768 Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Wed, 23 Aug 2023 20:11:41 +0545 Subject: [PATCH 11/14] fix: queue logic --- relayer/chains/icon/icon_chain_processor.go | 3 -- relayer/chains/wasm/query.go | 37 +++++++------ relayer/chains/wasm/tx.go | 28 +++++----- relayer/processor/message_processor.go | 56 ++++++++++++++++---- relayer/processor/path_end_runtime.go | 1 + relayer/processor/path_processor_internal.go | 5 +- relayer/processor/types.go | 30 +++++++++-- 7 files changed, 107 insertions(+), 53 deletions(-) diff --git a/relayer/chains/icon/icon_chain_processor.go b/relayer/chains/icon/icon_chain_processor.go index ceba82f72..4feaaa9a6 100644 --- a/relayer/chains/icon/icon_chain_processor.go +++ b/relayer/chains/icon/icon_chain_processor.go @@ -378,9 +378,6 @@ loop: break } time.Sleep(10 * time.Millisecond) - if icp.firstTime { - time.Sleep(4000 * time.Millisecond) - } icp.firstTime = false if br = nil; len(btpBlockRespCh) > 0 { br = <-btpBlockRespCh diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 87cf03b3a..b10bc8533 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -11,7 +11,6 @@ import ( "time" wasmtypes "github.com/CosmWasm/wasmd/x/wasm/types" - "github.com/avast/retry-go/v4" abci "github.com/cometbft/cometbft/abci/types" rpcclient "github.com/cometbft/cometbft/rpc/client" tmtypes "github.com/cometbft/cometbft/types" @@ -20,7 +19,6 @@ import ( "github.com/cosmos/gogoproto/proto" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/icon-project/IBC-Integration/libraries/go/common/icon" - "go.uber.org/zap" querytypes "github.com/cosmos/cosmos-sdk/types/query" bankTypes "github.com/cosmos/cosmos-sdk/x/bank/types" @@ -336,23 +334,24 @@ func (ap *WasmProvider) QueryClientConsensusState(ctx context.Context, chainHeig } func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmtypes.RawContractMessage) (op *wasmtypes.QuerySmartContractStateResponse, err error) { - return op, retry.Do(func() error { - done := ap.SetSDKContext() - defer done() - op, err = ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{ - Address: ap.PCfg.IbcHandlerAddress, - QueryData: param, - }) - return err - }, retry.Context(ctx), retry.Attempts(latestHeightQueryRetries), retry.Delay(50*time.Millisecond), retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) { - ap.log.Error( - "Failed to query", - zap.Uint("attempt", n+1), - zap.Uint("max_attempts", latestHeightQueryRetries), - zap.Any("Param", param), - zap.Error(err), - ) - })) + done := ap.SetSDKContext() + defer done() + return ap.QueryClient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{ + Address: ap.PCfg.IbcHandlerAddress, + QueryData: param, + }) + // return op, retry.Do(func() error { + // op, err = + // return err + // }, retry.Context(ctx), retry.Attempts(latestHeightQueryRetries), retry.Delay(50*time.Millisecond), retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) { + // ap.log.Error( + // "Failed to query", + // zap.Uint("attempt", n+1), + // zap.Uint("max_attempts", latestHeightQueryRetries), + // zap.Any("Param", param), + // zap.Error(err), + // ) + // })) } diff --git a/relayer/chains/wasm/tx.go b/relayer/chains/wasm/tx.go index f1bdafdf6..64b34dfca 100644 --- a/relayer/chains/wasm/tx.go +++ b/relayer/chains/wasm/tx.go @@ -744,20 +744,20 @@ func (ap *WasmProvider) SendMessagesToMempool( return err } - if msg.Type() == MethodUpdateClient { - if err := retry.Do(func() error { - if err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, true); err != nil { - if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { - ap.handleAccountSequenceMismatchError(err) - } - } - return err - }, retry.Context(ctx), rtyAtt, retry.Delay(time.Millisecond*time.Duration(ap.PCfg.BlockInterval)), rtyErr); err != nil { - ap.log.Error("Failed to update client", zap.Any("Message", msg)) - return err - } - continue - } + // if msg.Type() == MethodUpdateClient { + // if err := retry.Do(func() error { + // if err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, true); err != nil { + // if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + // ap.handleAccountSequenceMismatchError(err) + // } + // } + // return err + // }, retry.Context(ctx), rtyAtt, retry.Delay(time.Millisecond*time.Duration(ap.PCfg.BlockInterval)), rtyErr); err != nil { + // ap.log.Error("Failed to update client", zap.Any("Message", msg)) + // return err + // } + // continue + // } if err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, false); err != nil { if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { ap.handleAccountSequenceMismatchError(err) diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 35a4986ec..da2673ab6 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -109,8 +109,10 @@ func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst if src.BTPHeightQueue.Size() == 0 { return false, nil } - - btpHeightInfo := src.BTPHeightQueue.MustGetQueue() + btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { + return false, nil + } if btpHeightInfo.IsProcessing { return false, nil @@ -308,7 +310,10 @@ func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, sr if src.BTPHeightQueue.Size() == 0 { return nil } - btpHeightInfo := src.BTPHeightQueue.MustGetQueue() + btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { + return nil + } if !shouldUpdate { return nil @@ -401,12 +406,24 @@ func (mp *messageProcessor) sendClientUpdate( dst.log.Debug("Will relay client update") - dst.lastClientUpdateHeightMu.Lock() - dst.lastClientUpdateHeight = dst.latestBlock.Height - dst.lastClientUpdateHeightMu.Unlock() if IsBTPLightClient(dst.clientState) { + blockInfoHeight, err := src.BTPHeightQueue.GetQueue() + if err != nil { + mp.log.Debug("No message in the queue", zap.Error(err)) + return + } dst.lastClientUpdateHeightMu.Lock() - dst.lastClientUpdateHeight = uint64(dst.BTPHeightQueue.MustGetQueue().Height) + dst.lastClientUpdateHeight = uint64(blockInfoHeight.Height) + dst.lastClientUpdateHeightMu.Unlock() + src.BTPHeightQueue.ReplaceQueue(zeroIndex, BlockInfoHeight{ + Height: int64(blockInfoHeight.Height), + IsProcessing: true, + RetryCount: blockInfoHeight.RetryCount + 1, + }) + + } else { + dst.lastClientUpdateHeightMu.Lock() + dst.lastClientUpdateHeight = dst.latestBlock.Height dst.lastClientUpdateHeightMu.Unlock() } @@ -414,11 +431,19 @@ func (mp *messageProcessor) sendClientUpdate( callback := func(rtr *provider.RelayerTxResponse, err error) { + mp.log.Debug("Executing callback of sendClientUpdate ", + zap.Any("Transaction Status", rtr.Code), + zap.Any("Response", rtr), + zap.Any("LastClientUpdateHeight", dst.lastClientUpdateHeight)) + if IsBTPLightClient(dst.clientState) { - if dst.BTPHeightQueue.Size() == 0 { + if src.BTPHeightQueue.Size() == 0 { + return + } + blockHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { return } - blockHeightInfo := dst.BTPHeightQueue.MustGetQueue() if rtr.Code == 0 { if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { src.BTPHeightQueue.Dequeue() @@ -427,7 +452,18 @@ func (mp *messageProcessor) sendClientUpdate( } // this would represent a failure case in that case isProcessing should be false if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { - src.BTPHeightQueue.ReplaceQueue(0, BlockInfoHeight{Height: int64(dst.lastClientUpdateHeight), IsProcessing: false}) + if blockHeightInfo.RetryCount >= 5 { + // removing btpBLock update + src.BTPHeightQueue.Dequeue() + return + } + + src.BTPHeightQueue.ReplaceQueue(zeroIndex, BlockInfoHeight{ + Height: int64(dst.lastClientUpdateHeight), + IsProcessing: false, + RetryCount: blockHeightInfo.RetryCount + 1, + }) + } } } diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 39601f781..9226d171c 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -451,6 +451,7 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage, pathEndForHeight = pathEnd } + // should be setCounterparty because of this message is generated in response to TimeoutRequest packet if eventType == chantypes.EventTypeTimeoutPacket && IsBTPLightClient(pathEnd.clientState) { pathEndForHeight = counterparty } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 20cbc7a44..5cc49a470 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1552,8 +1552,6 @@ func (pp *PathProcessor) UpdateBTPHeight(ctx context.Context, src *pathEndRuntim srcIsIcon := src.chainProvider.Type() == common.IconModule dstIsBtpClient := IsBTPLightClient(dst.clientState) - fmt.Println(dst.clientState.ConsensusHeight.RevisionHeight) - if !srcIsIcon && !dstIsBtpClient { return } @@ -1569,7 +1567,8 @@ func (pp *PathProcessor) UpdateBTPHeight(ctx context.Context, src *pathEndRuntim if src.BTPHeightQueue.Size() == 0 { return } - for src.BTPHeightQueue.Size() > 0 { + size := src.BTPHeightQueue.Size() + for i := 0; i < size; i++ { btpHeightInfo := src.BTPHeightQueue.MustGetQueue() if dst.clientState.ConsensusHeight.RevisionHeight < uint64(btpHeightInfo.Height) { diff --git a/relayer/processor/types.go b/relayer/processor/types.go index df13a5c0a..9ff5d62e4 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -3,6 +3,7 @@ package processor import ( "fmt" "sort" + "sync" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/common" @@ -596,10 +597,12 @@ func ConnectionInfoConnectionKey(info provider.ConnectionInfo) ConnectionKey { type BlockInfoHeight struct { Height int64 IsProcessing bool + RetryCount int64 } type Queue[T any] struct { - items []T + items []T + itemMu *sync.Mutex } func (q *Queue[T]) Enqueue(item T) { @@ -608,20 +611,36 @@ func (q *Queue[T]) Enqueue(item T) { func (q *Queue[T]) MustGetQueue() T { if q.Size() == 0 { - var element T - return element + panic("the size of queue is zero") } item := q.items[0] return item } +func (q *Queue[T]) GetQueue() (T, error) { + + if q.Size() == 0 { + var element T + return element, fmt.Errorf("The queue is of empty length") + } + item := q.items[0] + return item, nil + +} + +var ( + zeroIndex = 0 +) + func (q *Queue[T]) ReplaceQueue(index int, element T) { + if q.Size() > index { q.items[index] = element } } func (q *Queue[T]) Dequeue() (T, error) { + if q.Size() == 0 { var element T return element, fmt.Errorf("all element dequed") @@ -636,5 +655,8 @@ func (q *Queue[T]) Size() int { } func NewBlockInfoHeightQueue() *Queue[BlockInfoHeight] { - return &Queue[BlockInfoHeight]{} + return &Queue[BlockInfoHeight]{ + items: make([]BlockInfoHeight, 0), + itemMu: &sync.Mutex{}, + } } From 6b8db31ff4b57204cd47be43be60d6158cce08b0 Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Wed, 23 Aug 2023 20:21:10 +0545 Subject: [PATCH 12/14] fix:refractor --- relayer/processor/path_end_runtime.go | 3 ++- relayer/processor/types.go | 38 +++++++++++++++++---------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 9226d171c..26cfd01f1 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -55,7 +55,7 @@ type pathEndRuntime struct { lastClientUpdateHeightMu sync.Mutex metrics *PrometheusMetrics - BTPHeightQueue *Queue[BlockInfoHeight] + BTPHeightQueue Queue[BlockInfoHeight] } func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetrics) *pathEndRuntime { @@ -476,6 +476,7 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage, ) return false } + } if !pathEnd.channelStateCache[k] { diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 9ff5d62e4..d9a4b647a 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -11,6 +11,10 @@ import ( "go.uber.org/zap/zapcore" ) +var ( + zeroIndex = 0 +) + // MessageLifecycle is used to send an initial IBC message to a chain // once the chains are in sync for the PathProcessor. // It also allows setting a stop condition for the PathProcessor. @@ -594,22 +598,33 @@ func ConnectionInfoConnectionKey(info provider.ConnectionInfo) ConnectionKey { } } +// binaryTree + +type Queue[T any] interface { + Enqueue(item T) + Dequeue() (T, error) + MustGetQueue() T + GetQueue() (T, error) + ReplaceQueue(index int, item T) + Size() int +} + type BlockInfoHeight struct { Height int64 IsProcessing bool RetryCount int64 } -type Queue[T any] struct { +type ArrayQueue[T any] struct { items []T itemMu *sync.Mutex } -func (q *Queue[T]) Enqueue(item T) { +func (q *ArrayQueue[T]) Enqueue(item T) { q.items = append(q.items, item) } -func (q *Queue[T]) MustGetQueue() T { +func (q *ArrayQueue[T]) MustGetQueue() T { if q.Size() == 0 { panic("the size of queue is zero") } @@ -617,7 +632,7 @@ func (q *Queue[T]) MustGetQueue() T { return item } -func (q *Queue[T]) GetQueue() (T, error) { +func (q *ArrayQueue[T]) GetQueue() (T, error) { if q.Size() == 0 { var element T @@ -628,18 +643,13 @@ func (q *Queue[T]) GetQueue() (T, error) { } -var ( - zeroIndex = 0 -) - -func (q *Queue[T]) ReplaceQueue(index int, element T) { - +func (q *ArrayQueue[T]) ReplaceQueue(index int, element T) { if q.Size() > index { q.items[index] = element } } -func (q *Queue[T]) Dequeue() (T, error) { +func (q *ArrayQueue[T]) Dequeue() (T, error) { if q.Size() == 0 { var element T @@ -650,12 +660,12 @@ func (q *Queue[T]) Dequeue() (T, error) { return item, nil } -func (q *Queue[T]) Size() int { +func (q *ArrayQueue[T]) Size() int { return len(q.items) } -func NewBlockInfoHeightQueue() *Queue[BlockInfoHeight] { - return &Queue[BlockInfoHeight]{ +func NewBlockInfoHeightQueue() *ArrayQueue[BlockInfoHeight] { + return &ArrayQueue[BlockInfoHeight]{ items: make([]BlockInfoHeight, 0), itemMu: &sync.Mutex{}, } From 5145d20a1bf30370317825d650d65efccc975e4d Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Wed, 23 Aug 2023 21:18:55 +0545 Subject: [PATCH 13/14] fix: implement mutex in queue --- relayer/processor/path_end_runtime.go | 9 ++++- relayer/processor/types.go | 54 +++++++++++++++++++++------ 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 26cfd01f1..0a0ef7cb2 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -77,7 +77,7 @@ func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetr clientICQProcessing: make(clientICQProcessingCache), connSubscribers: make(map[string][]func(provider.ConnectionInfo)), metrics: metrics, - BTPHeightQueue: NewBlockInfoHeightQueue(), + BTPHeightQueue: NewBlockInfoHeightQueue[BlockInfoHeight](), } } @@ -477,6 +477,13 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage, return false } + if counterparty.BTPHeightQueue.ItemExist(message.info.Height) { + pathEnd.log.Debug("Waiting to relay packet message until clientState is in queue", + zap.Inline(message), + zap.String("event_type", eventType), + ) + return false + } } if !pathEnd.channelStateCache[k] { diff --git a/relayer/processor/types.go b/relayer/processor/types.go index d9a4b647a..f32a636d3 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -598,42 +598,69 @@ func ConnectionInfoConnectionKey(info provider.ConnectionInfo) ConnectionKey { } } -// binaryTree - type Queue[T any] interface { Enqueue(item T) Dequeue() (T, error) MustGetQueue() T GetQueue() (T, error) + ItemExist(interface{}) bool ReplaceQueue(index int, item T) Size() int } +type ExistenceChecker interface { + Exists(target interface{}) bool +} + type BlockInfoHeight struct { Height int64 IsProcessing bool RetryCount int64 } -type ArrayQueue[T any] struct { - items []T - itemMu *sync.Mutex +func (bi BlockInfoHeight) Exists(target interface{}) bool { + if height, ok := target.(int64); ok { + return bi.Height == height + } + return false +} + +type ArrayQueue[T ExistenceChecker] struct { + items []T + mu *sync.Mutex } func (q *ArrayQueue[T]) Enqueue(item T) { + q.mu.Lock() + defer q.mu.Unlock() q.items = append(q.items, item) } func (q *ArrayQueue[T]) MustGetQueue() T { + q.mu.Lock() + defer q.mu.Unlock() if q.Size() == 0 { panic("the size of queue is zero") } + item := q.items[0] return item } -func (q *ArrayQueue[T]) GetQueue() (T, error) { +func (q *ArrayQueue[T]) ItemExist(target interface{}) bool { + q.mu.Lock() + defer q.mu.Unlock() + for _, item := range q.items { + if item.Exists(target) { + return true + } + } + return false +} +func (q *ArrayQueue[T]) GetQueue() (T, error) { + q.mu.Lock() + defer q.mu.Unlock() if q.Size() == 0 { var element T return element, fmt.Errorf("The queue is of empty length") @@ -644,13 +671,16 @@ func (q *ArrayQueue[T]) GetQueue() (T, error) { } func (q *ArrayQueue[T]) ReplaceQueue(index int, element T) { - if q.Size() > index { + q.mu.Lock() + defer q.mu.Unlock() + if index >= 0 && index < len(q.items) { q.items[index] = element } } func (q *ArrayQueue[T]) Dequeue() (T, error) { - + q.mu.Lock() + defer q.mu.Unlock() if q.Size() == 0 { var element T return element, fmt.Errorf("all element dequed") @@ -664,9 +694,9 @@ func (q *ArrayQueue[T]) Size() int { return len(q.items) } -func NewBlockInfoHeightQueue() *ArrayQueue[BlockInfoHeight] { - return &ArrayQueue[BlockInfoHeight]{ - items: make([]BlockInfoHeight, 0), - itemMu: &sync.Mutex{}, +func NewBlockInfoHeightQueue[T ExistenceChecker]() *ArrayQueue[T] { + return &ArrayQueue[T]{ + items: make([]T, 0), + mu: &sync.Mutex{}, } } From 87a9477e55809dff3770bc919a79d7a9939ffd1a Mon Sep 17 00:00:00 2001 From: viveksharmapoudel Date: Thu, 24 Aug 2023 12:39:14 +0545 Subject: [PATCH 14/14] fix: add prevconsensusstateheight method and use it to fetch trusted height --- relayer/chains/cosmos/query.go | 6 ++++ relayer/chains/icon/query.go | 5 +++ relayer/chains/penumbra/query.go | 5 +++ relayer/chains/wasm/query.go | 21 +++++++++++++ relayer/chains/wasm/types/types.go | 32 +++++++++++++++----- relayer/processor/message_processor.go | 17 ++++++++--- relayer/processor/path_end_runtime.go | 2 +- relayer/processor/path_processor_internal.go | 6 ++-- relayer/provider/provider.go | 2 +- 9 files changed, 80 insertions(+), 16 deletions(-) diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 6e438727f..5e03ed3c6 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -28,6 +28,7 @@ import ( chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" host "github.com/cosmos/ibc-go/v7/modules/core/24-host" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/cosmos/relayer/v2/relayer/provider" @@ -1169,3 +1170,8 @@ func (cc *CosmosProvider) QueryConsensusStateABCI(ctx context.Context, clientID ProofHeight: proofHeight, }, nil } + +func (ap *CosmosProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + panic("QueryClientPrevConsensusStateHeight not implemented") + +} diff --git a/relayer/chains/icon/query.go b/relayer/chains/icon/query.go index ce6b892f1..6d1668a5e 100644 --- a/relayer/chains/icon/query.go +++ b/relayer/chains/icon/query.go @@ -12,6 +12,7 @@ import ( "github.com/avast/retry-go/v4" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" "github.com/pkg/errors" "go.uber.org/zap" @@ -837,6 +838,10 @@ func (icp *IconProvider) QueryIconProof(ctx context.Context, height int64, keyHa return nil, nil } +func (ip *IconProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + panic("QueryClientPrevConsensusStateHeight not implemented") +} + func (icp *IconProvider) HexStringToProtoUnmarshal(encoded string, v proto.Message) ([]byte, error) { if encoded == "" { return nil, fmt.Errorf("Encoded string is empty ") diff --git a/relayer/chains/penumbra/query.go b/relayer/chains/penumbra/query.go index da651ef93..e20a64889 100644 --- a/relayer/chains/penumbra/query.go +++ b/relayer/chains/penumbra/query.go @@ -25,6 +25,7 @@ import ( chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" host "github.com/cosmos/ibc-go/v7/modules/core/24-host" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/cosmos/relayer/v2/relayer/provider" @@ -983,3 +984,7 @@ func (cc *PenumbraProvider) QueryICQWithProof(ctx context.Context, msgType strin //TODO implement me panic("implement me") } + +func (cc *PenumbraProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + panic("QueryClientPrevConsensusStateHeight not implemented") +} diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index b10bc8533..e7d48d495 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -27,6 +27,7 @@ import ( conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" "github.com/cosmos/relayer/v2/relayer/chains/wasm/types" @@ -858,3 +859,23 @@ func (ap *WasmProvider) QueryDenomTraces(ctx context.Context, offset, limit uint } return transfers, nil } + +func (ap *WasmProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + param, err := types.NewPrevConsensusStateHeight(clientId, uint64(clientHeight)).Bytes() + res, err := ap.QueryIBCHandlerContract(ctx, param) + if err != nil { + return nil, err + } + + var heights []int64 + err = json.Unmarshal(res.Data.Bytes(), &heights) + + if err != nil { + return nil, err + } + + if len(heights) == 0 { + return nil, fmt.Errorf("consensus state of client %s before %d", clientId, clientHeight) + } + return clienttypes.Height{RevisionNumber: 0, RevisionHeight: uint64(heights[0])}, nil +} diff --git a/relayer/chains/wasm/types/types.go b/relayer/chains/wasm/types/types.go index b0826d463..f6085a42c 100644 --- a/relayer/chains/wasm/types/types.go +++ b/relayer/chains/wasm/types/types.go @@ -71,11 +71,13 @@ func NewClientState(clientId string) *GetClientState { } } +type ConsensusStateByHeight struct { + ClientId string "json:\"client_id\"" + Height uint64 "json:\"height\"" +} + type GetConsensusStateByHeight struct { - ConsensusStateByHeight struct { - ClientId string "json:\"client_id\"" - Height uint64 "json:\"height\"" - } `json:"get_consensus_state_by_height"` + ConsensusStateByHeight ConsensusStateByHeight `json:"get_consensus_state_by_height"` } func (x *GetConsensusStateByHeight) Bytes() ([]byte, error) { @@ -84,10 +86,7 @@ func (x *GetConsensusStateByHeight) Bytes() ([]byte, error) { func NewConsensusStateByHeight(clientId string, height uint64) *GetConsensusStateByHeight { return &GetConsensusStateByHeight{ - ConsensusStateByHeight: struct { - ClientId string "json:\"client_id\"" - Height uint64 "json:\"height\"" - }{ + ConsensusStateByHeight: ConsensusStateByHeight{ ClientId: clientId, Height: height, }, @@ -353,3 +352,20 @@ func NewCommitmentPrefix() *GetCommitmentPrefix { GetCommitment: struct{}{}, } } + +type GetPrevConsensusStateHeight struct { + ConsensusStateByHeight ConsensusStateByHeight `json:"get_previous_consensus_state_height"` +} + +func (x *GetPrevConsensusStateHeight) Bytes() ([]byte, error) { + return json.Marshal(x) +} + +func NewPrevConsensusStateHeight(clientId string, height uint64) *GetPrevConsensusStateHeight { + return &GetPrevConsensusStateHeight{ + ConsensusStateByHeight: ConsensusStateByHeight{ + ClientId: clientId, + Height: height, + }, + } +} diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index da2673ab6..44ff7a9ab 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" @@ -229,7 +230,7 @@ func (mp *messageProcessor) assembleMessage( func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst *pathEndRuntime, shouldUpdate bool) error { if IsBTPLightClient(dst.clientState) { - err := mp.handleMsgUpdateClientForIcon(ctx, src, dst, shouldUpdate) + err := mp.handleMsgUpdateClientForBTPClient(ctx, src, dst, shouldUpdate) return err } @@ -302,7 +303,7 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds return nil } -func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, src, dst *pathEndRuntime, shouldUpdate bool) error { +func (mp *messageProcessor) handleMsgUpdateClientForBTPClient(ctx context.Context, src, dst *pathEndRuntime, shouldUpdate bool) error { clientID := dst.info.ClientID latestConsensusHeight := dst.clientState.ConsensusHeight @@ -332,15 +333,23 @@ func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, sr mp.log.Debug("Src latest header is less then latest client State", zap.String("chain-id", src.info.ChainID), zap.Int64("latest-header-height", int64(src.latestHeader.Height())), + zap.Int64("message processing btp-height", int64(src.latestHeader.Height())), zap.Int64("client-state-height", int64(latestConsensusHeight.RevisionHeight))) - return nil + height, err := dst.chainProvider.QueryClientPrevConsensusStateHeight(ctx, int64(dst.latestBlock.Height), dst.clientState.ClientID, int64(header.Height())) + if err != nil { + return fmt.Errorf("Failed to query prevClientConsensusState") + } + latestConsensusHeight = types.Height{ + RevisionNumber: height.GetRevisionNumber(), + RevisionHeight: height.GetRevisionHeight(), + } } msgUpdateClientHeader, err := src.chainProvider.MsgUpdateClientHeader( header, latestConsensusHeight, - dst.clientTrustedState.IBCHeader, + nil, ) if err != nil { return fmt.Errorf("error assembling new client header: %w", err) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index 0a0ef7cb2..c134ab8bf 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -476,8 +476,8 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage, ) return false } + if counterparty.BTPHeightQueue.ItemExist(int64(message.info.Height)) { - if counterparty.BTPHeightQueue.ItemExist(message.info.Height) { pathEnd.log.Debug("Waiting to relay packet message until clientState is in queue", zap.Inline(message), zap.String("event_type", eventType), diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 5cc49a470..8dd0c7f2f 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1569,8 +1569,10 @@ func (pp *PathProcessor) UpdateBTPHeight(ctx context.Context, src *pathEndRuntim } size := src.BTPHeightQueue.Size() for i := 0; i < size; i++ { - btpHeightInfo := src.BTPHeightQueue.MustGetQueue() - + btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { + continue + } if dst.clientState.ConsensusHeight.RevisionHeight < uint64(btpHeightInfo.Height) { break } diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index a548eae24..94eb8be35 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -445,7 +445,7 @@ type QueryProvider interface { QueryUpgradedConsState(ctx context.Context, height int64) (*clienttypes.QueryConsensusStateResponse, error) QueryConsensusState(ctx context.Context, height int64) (ibcexported.ConsensusState, int64, error) QueryClients(ctx context.Context) (clienttypes.IdentifiedClientStates, error) - + QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clinetId string, clientHeight int64) (ibcexported.Height, error) // ics 03 - connection QueryConnection(ctx context.Context, height int64, connectionid string) (*conntypes.QueryConnectionResponse, error) QueryConnections(ctx context.Context) (conns []*conntypes.IdentifiedConnection, err error)