diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 6e438727f..5e03ed3c6 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -28,6 +28,7 @@ import ( chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" host "github.com/cosmos/ibc-go/v7/modules/core/24-host" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/cosmos/relayer/v2/relayer/provider" @@ -1169,3 +1170,8 @@ func (cc *CosmosProvider) QueryConsensusStateABCI(ctx context.Context, clientID ProofHeight: proofHeight, }, nil } + +func (ap *CosmosProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + panic("QueryClientPrevConsensusStateHeight not implemented") + +} diff --git a/relayer/chains/icon/icon_chain_processor.go b/relayer/chains/icon/icon_chain_processor.go index e8f6a1e72..ceba82f72 100644 --- a/relayer/chains/icon/icon_chain_processor.go +++ b/relayer/chains/icon/icon_chain_processor.go @@ -657,7 +657,6 @@ func (icp *IconChainProcessor) handleBTPBlockRequest( } request.response.Header = NewIconIBCHeader(btpHeader, validators, int64(btpHeader.MainHeight)) request.response.IsProcessed = processed - } func (icp *IconChainProcessor) handlePathProcessorUpdate(ctx context.Context, diff --git a/relayer/chains/icon/provider_test.go b/relayer/chains/icon/provider_test.go index 7544bc7df..c52e7e83a 100644 --- a/relayer/chains/icon/provider_test.go +++ b/relayer/chains/icon/provider_test.go @@ -475,5 +475,5 @@ func TestHash(t *testing.T) { // isValid, err := VerifyBtpProof(decision, signedHeader.Signatures, proofContext) // assert.NoError(t, err) -// assert.True(t, isValid) -// } +// assert.True(t, isValid) +// } diff --git a/relayer/chains/icon/query.go b/relayer/chains/icon/query.go index ce6b892f1..6d1668a5e 100644 --- a/relayer/chains/icon/query.go +++ b/relayer/chains/icon/query.go @@ -12,6 +12,7 @@ import ( "github.com/avast/retry-go/v4" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" "github.com/pkg/errors" "go.uber.org/zap" @@ -837,6 +838,10 @@ func (icp *IconProvider) QueryIconProof(ctx context.Context, height int64, keyHa return nil, nil } +func (ip *IconProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + panic("QueryClientPrevConsensusStateHeight not implemented") +} + func (icp *IconProvider) HexStringToProtoUnmarshal(encoded string, v proto.Message) ([]byte, error) { if encoded == "" { return nil, fmt.Errorf("Encoded string is empty ") diff --git a/relayer/chains/penumbra/query.go b/relayer/chains/penumbra/query.go index da651ef93..e20a64889 100644 --- a/relayer/chains/penumbra/query.go +++ b/relayer/chains/penumbra/query.go @@ -25,6 +25,7 @@ import ( chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" host "github.com/cosmos/ibc-go/v7/modules/core/24-host" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" "github.com/cosmos/relayer/v2/relayer/provider" @@ -983,3 +984,7 @@ func (cc *PenumbraProvider) QueryICQWithProof(ctx context.Context, msgType strin //TODO implement me panic("implement me") } + +func (cc *PenumbraProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + panic("QueryClientPrevConsensusStateHeight not implemented") +} diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 87cf03b3a..32ae89ba0 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -29,6 +29,7 @@ import ( conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" commitmenttypes "github.com/cosmos/ibc-go/v7/modules/core/23-commitment/types" + "github.com/cosmos/ibc-go/v7/modules/core/exported" ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported" "github.com/cosmos/relayer/v2/relayer/chains/wasm/types" @@ -353,7 +354,6 @@ func (ap *WasmProvider) QueryIBCHandlerContract(ctx context.Context, param wasmt zap.Error(err), ) })) - } func (ap *WasmProvider) QueryIBCHandlerContractProcessed(ctx context.Context, param wasmtypes.RawContractMessage) ([]byte, error) { @@ -859,3 +859,23 @@ func (ap *WasmProvider) QueryDenomTraces(ctx context.Context, offset, limit uint } return transfers, nil } + +func (ap *WasmProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clientId string, clientHeight int64) (exported.Height, error) { + param, err := types.NewPrevConsensusStateHeight(clientId, uint64(clientHeight)).Bytes() + res, err := ap.QueryIBCHandlerContract(ctx, param) + if err != nil { + return nil, err + } + + var heights []int64 + err = json.Unmarshal(res.Data.Bytes(), &heights) + + if err != nil { + return nil, err + } + + if len(heights) == 0 { + return nil, fmt.Errorf("consensus state of client %s before %d", clientId, clientHeight) + } + return clienttypes.Height{RevisionNumber: 0, RevisionHeight: uint64(heights[0])}, nil +} diff --git a/relayer/chains/wasm/tx.go b/relayer/chains/wasm/tx.go index 00692f19b..64b34dfca 100644 --- a/relayer/chains/wasm/tx.go +++ b/relayer/chains/wasm/tx.go @@ -744,20 +744,20 @@ func (ap *WasmProvider) SendMessagesToMempool( return err } - if msg.Type() == MethodUpdateClient { - if err := retry.Do(func() error { - if err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, true); err != nil { - if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { - ap.handleAccountSequenceMismatchError(err) - } - } - return err - }, retry.Context(ctx), rtyAtt, retry.Delay(time.Millisecond*time.Duration(ap.PCfg.BlockInterval)), rtyErr); err != nil { - ap.log.Error("Failed to update client", zap.Any("Message", msg)) - return err - } - continue - } + // if msg.Type() == MethodUpdateClient { + // if err := retry.Do(func() error { + // if err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, true); err != nil { + // if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + // ap.handleAccountSequenceMismatchError(err) + // } + // } + // return err + // }, retry.Context(ctx), rtyAtt, retry.Delay(time.Millisecond*time.Duration(ap.PCfg.BlockInterval)), rtyErr); err != nil { + // ap.log.Error("Failed to update client", zap.Any("Message", msg)) + // return err + // } + // continue + // } if err := ap.BroadcastTx(cliCtx, txBytes, []provider.RelayerMessage{msg}, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback, false); err != nil { if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { ap.handleAccountSequenceMismatchError(err) @@ -1262,8 +1262,6 @@ func (cc *WasmProvider) QueryABCI(ctx context.Context, req abci.RequestQuery) (a func (cc *WasmProvider) handleAccountSequenceMismatchError(err error) { clientCtx := cc.ClientContext() - fmt.Println("client context is ", clientCtx.GetFromAddress()) - _, seq, err := cc.ClientCtx.AccountRetriever.GetAccountNumberSequence(clientCtx, clientCtx.GetFromAddress()) // sequences := numRegex.FindAllString(err.Error(), -1) diff --git a/relayer/chains/wasm/types/types.go b/relayer/chains/wasm/types/types.go index b0826d463..f6085a42c 100644 --- a/relayer/chains/wasm/types/types.go +++ b/relayer/chains/wasm/types/types.go @@ -71,11 +71,13 @@ func NewClientState(clientId string) *GetClientState { } } +type ConsensusStateByHeight struct { + ClientId string "json:\"client_id\"" + Height uint64 "json:\"height\"" +} + type GetConsensusStateByHeight struct { - ConsensusStateByHeight struct { - ClientId string "json:\"client_id\"" - Height uint64 "json:\"height\"" - } `json:"get_consensus_state_by_height"` + ConsensusStateByHeight ConsensusStateByHeight `json:"get_consensus_state_by_height"` } func (x *GetConsensusStateByHeight) Bytes() ([]byte, error) { @@ -84,10 +86,7 @@ func (x *GetConsensusStateByHeight) Bytes() ([]byte, error) { func NewConsensusStateByHeight(clientId string, height uint64) *GetConsensusStateByHeight { return &GetConsensusStateByHeight{ - ConsensusStateByHeight: struct { - ClientId string "json:\"client_id\"" - Height uint64 "json:\"height\"" - }{ + ConsensusStateByHeight: ConsensusStateByHeight{ ClientId: clientId, Height: height, }, @@ -353,3 +352,20 @@ func NewCommitmentPrefix() *GetCommitmentPrefix { GetCommitment: struct{}{}, } } + +type GetPrevConsensusStateHeight struct { + ConsensusStateByHeight ConsensusStateByHeight `json:"get_previous_consensus_state_height"` +} + +func (x *GetPrevConsensusStateHeight) Bytes() ([]byte, error) { + return json.Marshal(x) +} + +func NewPrevConsensusStateHeight(clientId string, height uint64) *GetPrevConsensusStateHeight { + return &GetPrevConsensusStateHeight{ + ConsensusStateByHeight: ConsensusStateByHeight{ + ClientId: clientId, + Height: height, + }, + } +} diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index d684f26bf..d1e2241a9 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -123,6 +123,7 @@ func (l latestClientState) update(ctx context.Context, clientInfo clientInfo, cc // update latest if no existing state or provided consensus height is newer l[clientInfo.clientID] = clientState + } // Provider returns the ChainProvider, which provides the methods for querying, assembling IBC messages, and sending transactions. diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 63102b95f..44ff7a9ab 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" @@ -104,26 +105,20 @@ func (mp *messageProcessor) processMessages( // Otherwise, it will be attempted if either 2/3 of the trusting period // or the configured client update threshold duration has passed. func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst *pathEndRuntime) (bool, error) { - var err error // handle if dst is IconLightClient if IsBTPLightClient(dst.clientState) { - - // if the latestblock is less than clientState height - if dst.clientState.ConsensusHeight.RevisionHeight >= src.latestBlock.Height { + if src.BTPHeightQueue.Size() == 0 { return false, nil } - - header, found := src.ibcHeaderCache[src.latestBlock.Height] - if !found { - header, err = src.chainProvider.QueryIBCHeader(ctx, int64(src.latestBlock.Height)) - if err != nil { - return false, err - } + btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { + return false, nil } - if header.IsCompleteBlock() { - return true, nil + + if btpHeightInfo.IsProcessing { + return false, nil } - return false, nil + return true, nil } // for lightClient other than ICON this will be helpful @@ -235,7 +230,7 @@ func (mp *messageProcessor) assembleMessage( func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst *pathEndRuntime, shouldUpdate bool) error { if IsBTPLightClient(dst.clientState) { - err := mp.handleMsgUpdateClientForIcon(ctx, src, dst, shouldUpdate) + err := mp.handleMsgUpdateClientForBTPClient(ctx, src, dst, shouldUpdate) return err } @@ -308,32 +303,53 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds return nil } -func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, src, dst *pathEndRuntime, shouldUpdate bool) error { +func (mp *messageProcessor) handleMsgUpdateClientForBTPClient(ctx context.Context, src, dst *pathEndRuntime, shouldUpdate bool) error { clientID := dst.info.ClientID latestConsensusHeight := dst.clientState.ConsensusHeight + if src.BTPHeightQueue.Size() == 0 { + return nil + } + btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { + return nil + } + if !shouldUpdate { return nil } - if !src.latestHeader.IsCompleteBlock() { + header, err := src.chainProvider.QueryIBCHeader(ctx, btpHeightInfo.Height) + if err != nil { + return fmt.Errorf("Failed to query header for height %d", btpHeightInfo.Height) + } + + if !header.IsCompleteBlock() { return fmt.Errorf("Should Update is true but the Header is incomplete") } - if src.latestHeader.Height() <= latestConsensusHeight.RevisionHeight { + if header.Height() <= latestConsensusHeight.RevisionHeight { mp.log.Debug("Src latest header is less then latest client State", zap.String("chain-id", src.info.ChainID), zap.Int64("latest-header-height", int64(src.latestHeader.Height())), + zap.Int64("message processing btp-height", int64(src.latestHeader.Height())), zap.Int64("client-state-height", int64(latestConsensusHeight.RevisionHeight))) - return nil + height, err := dst.chainProvider.QueryClientPrevConsensusStateHeight(ctx, int64(dst.latestBlock.Height), dst.clientState.ClientID, int64(header.Height())) + if err != nil { + return fmt.Errorf("Failed to query prevClientConsensusState") + } + latestConsensusHeight = types.Height{ + RevisionNumber: height.GetRevisionNumber(), + RevisionHeight: height.GetRevisionHeight(), + } } msgUpdateClientHeader, err := src.chainProvider.MsgUpdateClientHeader( - src.latestHeader, + header, latestConsensusHeight, - dst.clientTrustedState.IBCHeader, + nil, ) if err != nil { return fmt.Errorf("error assembling new client header: %w", err) @@ -348,18 +364,6 @@ func (mp *messageProcessor) handleMsgUpdateClientForIcon(ctx context.Context, sr return nil } -func (mp *messageProcessor) findNextIBCHeader(ctx context.Context, src, dst *pathEndRuntime) (provider.IBCHeader, error) { - clientConsensusHeight := dst.clientState.ConsensusHeight - if IsBTPLightClient(dst.clientState) { - header, found := nextIconIBCHeader(src.ibcHeaderCache.Clone(), dst.lastClientUpdateHeight) - if !found { - return nil, fmt.Errorf("unable to find Icon IBC header for Next height of %d ", clientConsensusHeight.RevisionHeight) - } - return header, nil - } - return src.chainProvider.QueryIBCHeader(ctx, int64(clientConsensusHeight.RevisionHeight+1)) -} - // trackAndSendMessages will increment attempt counters for each message and send each message. // Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error // in a previous batch. @@ -411,13 +415,69 @@ func (mp *messageProcessor) sendClientUpdate( dst.log.Debug("Will relay client update") - dst.lastClientUpdateHeightMu.Lock() - dst.lastClientUpdateHeight = dst.latestBlock.Height - dst.lastClientUpdateHeightMu.Unlock() + if IsBTPLightClient(dst.clientState) { + blockInfoHeight, err := src.BTPHeightQueue.GetQueue() + if err != nil { + mp.log.Debug("No message in the queue", zap.Error(err)) + return + } + dst.lastClientUpdateHeightMu.Lock() + dst.lastClientUpdateHeight = uint64(blockInfoHeight.Height) + dst.lastClientUpdateHeightMu.Unlock() + src.BTPHeightQueue.ReplaceQueue(zeroIndex, BlockInfoHeight{ + Height: int64(blockInfoHeight.Height), + IsProcessing: true, + RetryCount: blockInfoHeight.RetryCount + 1, + }) + + } else { + dst.lastClientUpdateHeightMu.Lock() + dst.lastClientUpdateHeight = dst.latestBlock.Height + dst.lastClientUpdateHeightMu.Unlock() + } msgs := []provider.RelayerMessage{mp.msgUpdateClient} - if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, nil); err != nil { + callback := func(rtr *provider.RelayerTxResponse, err error) { + + mp.log.Debug("Executing callback of sendClientUpdate ", + zap.Any("Transaction Status", rtr.Code), + zap.Any("Response", rtr), + zap.Any("LastClientUpdateHeight", dst.lastClientUpdateHeight)) + + if IsBTPLightClient(dst.clientState) { + if src.BTPHeightQueue.Size() == 0 { + return + } + blockHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { + return + } + if rtr.Code == 0 { + if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { + src.BTPHeightQueue.Dequeue() + } + return + } + // this would represent a failure case in that case isProcessing should be false + if blockHeightInfo.Height == int64(dst.lastClientUpdateHeight) { + if blockHeightInfo.RetryCount >= 5 { + // removing btpBLock update + src.BTPHeightQueue.Dequeue() + return + } + + src.BTPHeightQueue.ReplaceQueue(zeroIndex, BlockInfoHeight{ + Height: int64(dst.lastClientUpdateHeight), + IsProcessing: false, + RetryCount: blockHeightInfo.RetryCount + 1, + }) + + } + } + } + + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callback); err != nil { mp.log.Error("Error sending client update message", zap.String("path_name", src.info.PathName), zap.String("src_chain_id", src.info.ChainID), diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index c81e2c19f..c134ab8bf 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -54,7 +54,8 @@ type pathEndRuntime struct { lastClientUpdateHeight uint64 lastClientUpdateHeightMu sync.Mutex - metrics *PrometheusMetrics + metrics *PrometheusMetrics + BTPHeightQueue Queue[BlockInfoHeight] } func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetrics) *pathEndRuntime { @@ -76,6 +77,7 @@ func newPathEndRuntime(log *zap.Logger, pathEnd PathEnd, metrics *PrometheusMetr clientICQProcessing: make(clientICQProcessingCache), connSubscribers: make(map[string][]func(provider.ConnectionInfo)), metrics: metrics, + BTPHeightQueue: NewBlockInfoHeightQueue[BlockInfoHeight](), } } @@ -385,15 +387,14 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func() pathEnd.latestBlock = d.LatestBlock pathEnd.latestBlockMu.Unlock() - if d.IsGenesis { - pathEnd.lastClientUpdateHeightMu.Lock() - pathEnd.lastClientUpdateHeight = d.LatestBlock.Height - pathEnd.lastClientUpdateHeightMu.Unlock() - } pathEnd.inSync = d.InSync pathEnd.latestHeader = d.LatestHeader pathEnd.clientState = d.ClientState + if pathEnd.chainProvider.Type() == common.IconModule && d.LatestHeader.IsCompleteBlock() { + pathEnd.BTPHeightQueue.Enqueue(BlockInfoHeight{Height: int64(d.LatestHeader.Height()), IsProcessing: false}) + } + terminate, err := pathEnd.checkForMisbehaviour(ctx, pathEnd.clientState, counterParty) if err != nil { pathEnd.log.Error( @@ -450,6 +451,7 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage, pathEndForHeight = pathEnd } + // should be setCounterparty because of this message is generated in response to TimeoutRequest packet if eventType == chantypes.EventTypeTimeoutPacket && IsBTPLightClient(pathEnd.clientState) { pathEndForHeight = counterparty } @@ -474,6 +476,14 @@ func (pathEnd *pathEndRuntime) shouldSendPacketMessage(message packetIBCMessage, ) return false } + if counterparty.BTPHeightQueue.ItemExist(int64(message.info.Height)) { + + pathEnd.log.Debug("Waiting to relay packet message until clientState is in queue", + zap.Inline(message), + zap.String("event_type", eventType), + ) + return false + } } if !pathEnd.channelStateCache[k] { diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index b5c55cd73..8dd0c7f2f 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -8,6 +8,7 @@ import ( "sort" "sync" + clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" @@ -938,6 +939,10 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( pp.updateClientTrustedState(pp.pathEnd1, pp.pathEnd2) pp.updateClientTrustedState(pp.pathEnd2, pp.pathEnd1) + //for btp updateClient steps + pp.UpdateBTPHeight(ctx, pp.pathEnd1, pp.pathEnd2) + pp.UpdateBTPHeight(ctx, pp.pathEnd2, pp.pathEnd1) + channelPairs := pp.channelPairs() pp.queuePreInitMessages(cancel) @@ -1542,3 +1547,45 @@ func (pp *PathProcessor) shouldTerminateForFlushComplete() bool { pp.log.Info("Found termination condition for flush, all caches cleared") return true } + +func (pp *PathProcessor) UpdateBTPHeight(ctx context.Context, src *pathEndRuntime, dst *pathEndRuntime) { + srcIsIcon := src.chainProvider.Type() == common.IconModule + dstIsBtpClient := IsBTPLightClient(dst.clientState) + + if !srcIsIcon && !dstIsBtpClient { + return + } + + if srcIsIcon && !dstIsBtpClient || !srcIsIcon && dstIsBtpClient { + pp.log.Error("Src Icon module mismatch with dst btp client", + zap.String("Src Chain Type ", src.chainProvider.Type()), + zap.String("Dst client Id", dst.clientState.ClientID), + ) + return + } + + if src.BTPHeightQueue.Size() == 0 { + return + } + size := src.BTPHeightQueue.Size() + for i := 0; i < size; i++ { + btpHeightInfo, err := src.BTPHeightQueue.GetQueue() + if err != nil { + continue + } + if dst.clientState.ConsensusHeight.RevisionHeight < uint64(btpHeightInfo.Height) { + break + } + if dst.clientState.ConsensusHeight.RevisionHeight == uint64(btpHeightInfo.Height) { + src.BTPHeightQueue.Dequeue() + continue + } + if dst.clientState.ConsensusHeight.RevisionHeight > uint64(btpHeightInfo.Height) { + cs, err := dst.chainProvider.QueryClientConsensusState(ctx, int64(dst.latestBlock.Height), dst.clientState.ClientID, clienttypes.NewHeight(0, uint64(btpHeightInfo.Height))) + if err == nil && cs != nil { + // removing latest height element + src.BTPHeightQueue.Dequeue() + } + } + } +} diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 331c6a70f..f32a636d3 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -3,6 +3,7 @@ package processor import ( "fmt" "sort" + "sync" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/common" @@ -10,6 +11,10 @@ import ( "go.uber.org/zap/zapcore" ) +var ( + zeroIndex = 0 +) + // MessageLifecycle is used to send an initial IBC message to a chain // once the chains are in sync for the PathProcessor. // It also allows setting a stop condition for the PathProcessor. @@ -592,3 +597,106 @@ func ConnectionInfoConnectionKey(info provider.ConnectionInfo) ConnectionKey { CounterpartyConnID: info.CounterpartyConnID, } } + +type Queue[T any] interface { + Enqueue(item T) + Dequeue() (T, error) + MustGetQueue() T + GetQueue() (T, error) + ItemExist(interface{}) bool + ReplaceQueue(index int, item T) + Size() int +} + +type ExistenceChecker interface { + Exists(target interface{}) bool +} + +type BlockInfoHeight struct { + Height int64 + IsProcessing bool + RetryCount int64 +} + +func (bi BlockInfoHeight) Exists(target interface{}) bool { + if height, ok := target.(int64); ok { + return bi.Height == height + } + return false +} + +type ArrayQueue[T ExistenceChecker] struct { + items []T + mu *sync.Mutex +} + +func (q *ArrayQueue[T]) Enqueue(item T) { + q.mu.Lock() + defer q.mu.Unlock() + q.items = append(q.items, item) +} + +func (q *ArrayQueue[T]) MustGetQueue() T { + q.mu.Lock() + defer q.mu.Unlock() + if q.Size() == 0 { + panic("the size of queue is zero") + } + + item := q.items[0] + return item +} + +func (q *ArrayQueue[T]) ItemExist(target interface{}) bool { + q.mu.Lock() + defer q.mu.Unlock() + for _, item := range q.items { + if item.Exists(target) { + return true + } + } + return false +} + +func (q *ArrayQueue[T]) GetQueue() (T, error) { + q.mu.Lock() + defer q.mu.Unlock() + if q.Size() == 0 { + var element T + return element, fmt.Errorf("The queue is of empty length") + } + item := q.items[0] + return item, nil + +} + +func (q *ArrayQueue[T]) ReplaceQueue(index int, element T) { + q.mu.Lock() + defer q.mu.Unlock() + if index >= 0 && index < len(q.items) { + q.items[index] = element + } +} + +func (q *ArrayQueue[T]) Dequeue() (T, error) { + q.mu.Lock() + defer q.mu.Unlock() + if q.Size() == 0 { + var element T + return element, fmt.Errorf("all element dequed") + } + item := q.items[0] + q.items = q.items[1:] + return item, nil +} + +func (q *ArrayQueue[T]) Size() int { + return len(q.items) +} + +func NewBlockInfoHeightQueue[T ExistenceChecker]() *ArrayQueue[T] { + return &ArrayQueue[T]{ + items: make([]T, 0), + mu: &sync.Mutex{}, + } +} diff --git a/relayer/processor/utils.go b/relayer/processor/utils.go index 21f1d038a..1837546f9 100644 --- a/relayer/processor/utils.go +++ b/relayer/processor/utils.go @@ -53,3 +53,7 @@ func nextIconIBCHeader(heightMap IBCHeaderCache, height uint64) (provider.IBCHea header, ok := heightMap[nextHeight] return header, ok } + +func FindConsensusHeightFromEventLog([]provider.RelayerEvent) int64 { + return 0 +} diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index a548eae24..94eb8be35 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -445,7 +445,7 @@ type QueryProvider interface { QueryUpgradedConsState(ctx context.Context, height int64) (*clienttypes.QueryConsensusStateResponse, error) QueryConsensusState(ctx context.Context, height int64) (ibcexported.ConsensusState, int64, error) QueryClients(ctx context.Context) (clienttypes.IdentifiedClientStates, error) - + QueryClientPrevConsensusStateHeight(ctx context.Context, chainHeight int64, clinetId string, clientHeight int64) (ibcexported.Height, error) // ics 03 - connection QueryConnection(ctx context.Context, height int64, connectionid string) (*conntypes.QueryConnectionResponse, error) QueryConnections(ctx context.Context) (conns []*conntypes.IdentifiedConnection, err error)