From 520fbe2f2b73254a942e64b4c85e0055bc4d5834 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 23 Aug 2022 00:11:01 -0600 Subject: [PATCH 1/4] bugfix for intermittent channel handshake failure --- relayer/chains/cosmos/message_handlers.go | 18 ++++++++++++++++ relayer/processor/path_end_runtime.go | 16 +++++++------- relayer/processor/path_processor_internal.go | 8 +++---- relayer/processor/types.go | 22 ++++---------------- 4 files changed, 34 insertions(+), 30 deletions(-) diff --git a/relayer/chains/cosmos/message_handlers.go b/relayer/chains/cosmos/message_handlers.go index 10fbd3b0b..092add740 100644 --- a/relayer/chains/cosmos/message_handlers.go +++ b/relayer/chains/cosmos/message_handlers.go @@ -69,6 +69,15 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi } } } + if eventType != chantypes.EventTypeChannelOpenInit { + // Clear out MsgInitKeys once we have the counterparty channel ID + msgInitKey := channelKey.MsgInitKey() + for k := range ccp.channelStateCache { + if k == msgInitKey { + delete(ccp.channelStateCache, k) + } + } + } ibcMessagesCache.ChannelHandshake.Retain(channelKey, eventType, ci) ccp.logChannelMessage(eventType, ci) @@ -79,6 +88,15 @@ func (ccp *CosmosChainProcessor) handleConnectionMessage(eventType string, ci pr connectionKey := processor.ConnectionInfoConnectionKey(ci) open := (eventType == conntypes.EventTypeConnectionOpenAck || eventType == conntypes.EventTypeConnectionOpenConfirm) ccp.connectionStateCache[connectionKey] = open + if eventType != conntypes.EventTypeConnectionOpenInit { + // Clear out MsgInitKeys once we have the counterparty connection ID + msgInitKey := connectionKey.MsgInitKey() + for k := range ccp.connectionStateCache { + if k == msgInitKey { + delete(ccp.connectionStateCache, k) + } + } + } ibcMessagesCache.ConnectionHandshake.Retain(connectionKey, eventType, ci) ccp.logConnectionMessage(eventType, ci) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index aa9a2da23..f99c1953b 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -284,8 +284,8 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func() return } - pathEnd.connectionStateCache.Merge(d.ConnectionStateCache) // Update latest connection open state for chain - pathEnd.channelStateCache.Merge(d.ChannelStateCache) // Update latest channel open state for chain + pathEnd.connectionStateCache = d.ConnectionStateCache // Update latest connection open state for chain + pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain pathEnd.mergeMessageCache(d.IBCMessagesCache, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog @@ -428,14 +428,14 @@ func (pathEnd *pathEndRuntime) shouldSendConnectionMessage(message connectionIBC counterpartyKey := k.Counterparty() switch eventType { case conntypes.EventTypeConnectionOpenInit: - toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.msgInitKey()} + toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.MsgInitKey()} case conntypes.EventTypeConnectionOpenAck: toDeleteCounterparty[conntypes.EventTypeConnectionOpenTry] = []ConnectionKey{counterpartyKey} - toDelete[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{k.msgInitKey()} + toDelete[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{k.MsgInitKey()} case conntypes.EventTypeConnectionOpenConfirm: toDeleteCounterparty[conntypes.EventTypeConnectionOpenAck] = []ConnectionKey{counterpartyKey} toDelete[conntypes.EventTypeConnectionOpenTry] = []ConnectionKey{k} - toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.msgInitKey()} + toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.MsgInitKey()} } // delete in progress send for this specific message pathEnd.connProcessing.deleteMessages(map[string][]ConnectionKey{eventType: []ConnectionKey{k}}) @@ -499,14 +499,14 @@ func (pathEnd *pathEndRuntime) shouldSendChannelMessage(message channelIBCMessag counterpartyKey := channelKey.Counterparty() switch eventType { case chantypes.EventTypeChannelOpenTry: - toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.msgInitKey()} + toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()} case chantypes.EventTypeChannelOpenAck: toDeleteCounterparty[chantypes.EventTypeChannelOpenTry] = []ChannelKey{counterpartyKey} - toDelete[chantypes.EventTypeChannelOpenInit] = []ChannelKey{channelKey.msgInitKey()} + toDelete[chantypes.EventTypeChannelOpenInit] = []ChannelKey{channelKey.MsgInitKey()} case chantypes.EventTypeChannelOpenConfirm: toDeleteCounterparty[chantypes.EventTypeChannelOpenAck] = []ChannelKey{counterpartyKey} toDelete[chantypes.EventTypeChannelOpenTry] = []ChannelKey{channelKey} - toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.msgInitKey()} + toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()} case chantypes.EventTypeChannelCloseConfirm: toDeleteCounterparty[chantypes.EventTypeChannelCloseInit] = []ChannelKey{counterpartyKey} toDelete[chantypes.EventTypeChannelCloseConfirm] = []ChannelKey{channelKey} diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 18b5ebbcc..7ec5e97b8 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -192,7 +192,7 @@ ConnectionHandshakeLoop: // MsgConnectionOpenInit does not have counterparty connection ID, so check if everything // else matches for counterparty. If so, add counterparty connection ID for // the checks later on in this function. - if openInitKey.ConnectionID == openTryKey.CounterpartyConnID && openInitKey.ClientID == openTryKey.CounterpartyClientID && openInitKey.CounterpartyClientID == openTryKey.ClientID { + if openInitKey == openTryKey.Counterparty().MsgInitKey() { openInitKey.CounterpartyConnID = openTryKey.ConnectionID foundOpenTry = &openTryMsg break @@ -281,7 +281,7 @@ ChannelHandshakeLoop: // MsgChannelOpenInit does not have counterparty channel ID, so check if everything // else matches for counterparty. If so, add counterparty channel ID for // the checks later on in this function. - if openInitKey == openTryKey.Counterparty().msgInitKey() { + if openInitKey == openTryKey.Counterparty().MsgInitKey() { openInitKey.CounterpartyChannelID = openTryMsg.ChannelID foundOpenTry = &openTryMsg break @@ -339,7 +339,7 @@ ChannelHandshakeLoop: res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck], openInitKey) res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm] = append(res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm], openInitKey) // MsgChannelOpenInit does not have CounterpartyChannelID - res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openInitKey.msgInitKey()) + res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openInitKey.MsgInitKey()) } // now iterate through channel-handshake-complete messages and remove any leftover messages @@ -348,7 +348,7 @@ ChannelHandshakeLoop: res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck], openConfirmKey) res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm] = append(res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm], openConfirmKey) // MsgChannelOpenInit does not have CounterpartyChannelID - res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openConfirmKey.msgInitKey()) + res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openConfirmKey.MsgInitKey()) } return res } diff --git a/relayer/processor/types.go b/relayer/processor/types.go index d447b32db..05fa7f411 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -127,9 +127,9 @@ func (k ChannelKey) Counterparty() ChannelKey { } } -// msgInitKey is used for comparing MsgChannelOpenInit keys with other connection +// MsgInitKey is used for comparing MsgChannelOpenInit keys with other connection // handshake messages. MsgChannelOpenInit does not have CounterpartyChannelID. -func (k ChannelKey) msgInitKey() ChannelKey { +func (k ChannelKey) MsgInitKey() ChannelKey { return ChannelKey{ ChannelID: k.ChannelID, PortID: k.PortID, @@ -164,9 +164,9 @@ func (connectionKey ConnectionKey) Counterparty() ConnectionKey { } } -// msgInitKey is used for comparing MsgConnectionOpenInit keys with other connection +// MsgInitKey is used for comparing MsgConnectionOpenInit keys with other connection // handshake messages. MsgConnectionOpenInit does not have CounterpartyConnectionID. -func (connectionKey ConnectionKey) msgInitKey() ConnectionKey { +func (connectionKey ConnectionKey) MsgInitKey() ConnectionKey { return ConnectionKey{ ClientID: connectionKey.ClientID, ConnectionID: connectionKey.ConnectionID, @@ -186,13 +186,6 @@ func (k ConnectionKey) MarshalLogObject(enc zapcore.ObjectEncoder) error { // ChannelStateCache maintains channel open state for multiple channels. type ChannelStateCache map[ChannelKey]bool -// Merge merges another ChannelStateCache into this one, appending messages and updating the Open state. -func (c ChannelStateCache) Merge(other ChannelStateCache) { - for channelKey, newState := range other { - c[channelKey] = newState - } -} - // FilterForClient returns a filtered copy of channels on top of an underlying clientID so it can be used by other goroutines. func (c ChannelStateCache) FilterForClient(clientID string, channelConnections map[string]string, connectionClients map[string]string) ChannelStateCache { n := make(ChannelStateCache) @@ -215,13 +208,6 @@ func (c ChannelStateCache) FilterForClient(clientID string, channelConnections m // ConnectionStateCache maintains connection open state for multiple connections. type ConnectionStateCache map[ConnectionKey]bool -// Merge merges another ChannelStateCache into this one, appending messages and updating the Open state. -func (c ConnectionStateCache) Merge(other ConnectionStateCache) { - for channelKey, newState := range other { - c[channelKey] = newState - } -} - // FilterForClient makes a filtered copy of the ConnectionStateCache // for a single client ID so it can be used by other goroutines. func (c ConnectionStateCache) FilterForClient(clientID string) ConnectionStateCache { From c114593cfc723b8da12fa1a2000584c5ab88d43e Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 23 Aug 2022 12:58:30 -0600 Subject: [PATCH 2/4] Add unit tests for connection and channel state caches --- relayer/chains/cosmos/message_handlers.go | 63 ++++-- .../chains/cosmos/message_handlers_test.go | 180 ++++++++++++++++++ 2 files changed, 222 insertions(+), 21 deletions(-) create mode 100644 relayer/chains/cosmos/message_handlers_test.go diff --git a/relayer/chains/cosmos/message_handlers.go b/relayer/chains/cosmos/message_handlers.go index 75d611e47..7e9f67ee9 100644 --- a/relayer/chains/cosmos/message_handlers.go +++ b/relayer/chains/cosmos/message_handlers.go @@ -56,28 +56,40 @@ func (ccp *CosmosChainProcessor) handlePacketMessage(eventType string, pi provid func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provider.ChannelInfo, ibcMessagesCache processor.IBCMessagesCache) { ccp.channelConnections[ci.ChannelID] = ci.ConnID channelKey := processor.ChannelInfoChannelKey(ci) - switch eventType { - case chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenTry: - ccp.channelStateCache[channelKey] = false - case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm: - ccp.channelStateCache[channelKey] = true - case chantypes.EventTypeChannelCloseConfirm: + + if eventType == chantypes.EventTypeChannelOpenInit { + found := false for k := range ccp.channelStateCache { - if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID { - ccp.channelStateCache[k] = false + // Don't add a channelKey to the channelStateCache without counterparty channel ID + // since we already have the channelKey in the channelStateCache which includes the + // counterparty channel ID. + if k.MsgInitKey() == channelKey { + found = true break } } - } - if eventType != chantypes.EventTypeChannelOpenInit { + if !found { + ccp.channelStateCache[channelKey] = false + } + } else { // Clear out MsgInitKeys once we have the counterparty channel ID - msgInitKey := channelKey.MsgInitKey() - for k := range ccp.channelStateCache { - if k == msgInitKey { - delete(ccp.channelStateCache, k) + switch eventType { + case chantypes.EventTypeChannelOpenTry: + ccp.channelStateCache[channelKey] = false + case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm: + ccp.channelStateCache[channelKey] = true + case chantypes.EventTypeChannelCloseConfirm: + for k := range ccp.channelStateCache { + if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID { + ccp.channelStateCache[k] = false + break + } } } + // Clear out MsgInitKeys once we have the counterparty channel ID + delete(ccp.channelStateCache, channelKey.MsgInitKey()) } + ibcMessagesCache.ChannelHandshake.Retain(channelKey, eventType, ci) ccp.logChannelMessage(eventType, ci) @@ -86,16 +98,25 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi func (ccp *CosmosChainProcessor) handleConnectionMessage(eventType string, ci provider.ConnectionInfo, ibcMessagesCache processor.IBCMessagesCache) { ccp.connectionClients[ci.ConnID] = ci.ClientID connectionKey := processor.ConnectionInfoConnectionKey(ci) - open := (eventType == conntypes.EventTypeConnectionOpenAck || eventType == conntypes.EventTypeConnectionOpenConfirm) - ccp.connectionStateCache[connectionKey] = open - if eventType != conntypes.EventTypeConnectionOpenInit { - // Clear out MsgInitKeys once we have the counterparty connection ID - msgInitKey := connectionKey.MsgInitKey() + if eventType == conntypes.EventTypeConnectionOpenInit { + found := false for k := range ccp.connectionStateCache { - if k == msgInitKey { - delete(ccp.connectionStateCache, k) + // Don't add a connectionKey to the connectionStateCache without counterparty connection ID + // since we already have the connectionKey in the connectionStateCache which includes the + // counterparty connection ID. + if k.MsgInitKey() == connectionKey { + found = true + break } } + if !found { + ccp.connectionStateCache[connectionKey] = false + } + } else { + // Clear out MsgInitKeys once we have the counterparty connection ID + delete(ccp.connectionStateCache, connectionKey.MsgInitKey()) + open := (eventType == conntypes.EventTypeConnectionOpenAck || eventType == conntypes.EventTypeConnectionOpenConfirm) + ccp.connectionStateCache[connectionKey] = open } ibcMessagesCache.ConnectionHandshake.Retain(connectionKey, eventType, ci) diff --git a/relayer/chains/cosmos/message_handlers_test.go b/relayer/chains/cosmos/message_handlers_test.go new file mode 100644 index 000000000..3ebdedb20 --- /dev/null +++ b/relayer/chains/cosmos/message_handlers_test.go @@ -0,0 +1,180 @@ +package cosmos + +import ( + "testing" + + conntypes "github.com/cosmos/ibc-go/v5/modules/core/03-connection/types" + chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/processor" + "github.com/cosmos/relayer/v2/relayer/provider" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestConnectionStateCache(t *testing.T) { + var ( + mockConn = "connection-0" + mockCounterpartyConn = "connection-1" + msgOpenInit = provider.ConnectionInfo{ + ConnID: mockConn, + CounterpartyConnID: "", // empty for MsgConnectionOpenInit, different from all other connection handshake messages. + } + msgOpenAck = provider.ConnectionInfo{ + ConnID: mockConn, + CounterpartyConnID: mockCounterpartyConn, // non-empty + } + + // fully populated connectionKey + k = processor.ConnectionInfoConnectionKey(msgOpenAck) + ) + + t.Run("fresh handshake", func(t *testing.T) { + // Emulate scenario of a connection handshake occurring after the relayer is started. + // The connectionStateCache will initially be empty for this connection. The MsgConnectionOpenInit + // will populate the connectionStateCache with a key that has an empty counterparty connection ID. + // The MsgConnectionOpenTry needs to replace this key with a key that has the counterparty connection ID. + + ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}) + c := processor.NewIBCMessagesCache() + + // Observe MsgConnectionOpenInit, which does not have counterparty connection ID. + ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenInit, msgOpenInit, c) + + require.Len(t, ccp.connectionStateCache, 1) + + // The connection state is not open, but the entry should exist in the connectionStateCache. + // MsgInitKey returns the ConnectionKey with an empty counterparty connection ID. + require.False(t, ccp.connectionStateCache[k.MsgInitKey()]) + + // Observe MsgConnectionOpenAck, which does have counterparty connection ID. + ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenAck, msgOpenAck, c) + + // The key with the empty counterparty connection ID should have been removed. + // The key with the counterparty connection ID should have been added. + require.Len(t, ccp.connectionStateCache, 1) + + // The fully populated ConnectionKey should now be the only entry for this connection. + // The connection now open. + require.True(t, ccp.connectionStateCache[k]) + }) + + t.Run("handshake already occurred", func(t *testing.T) { + // Emulate scenario of a connection handshake occurring before the relayer is started, + // but where all connection handshake messages occurred within the initial-block-history. + // This introduces an interesting edge condition where the connectionStateCache will be populated + // with the open connection, including the counterparty connection ID, but then the + // MsgConnectionOpenInit message will be observed afterwards without the counterparty connection ID. + // We need to make sure that the connectionStateCache does not have two keys for the same connection, + // i.e. one key with the counterparty connection ID, and one without. + + ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}) + c := processor.NewIBCMessagesCache() + + // Initialize connectionStateCache with populated connection ID and counterparty connection ID. + // This emulates initializeConnectionState after a recent connection handshake has completed + ccp.connectionStateCache[k] = true + + // Observe MsgConnectionOpenInit, which does not have counterparty connection ID. + ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenInit, msgOpenInit, c) + + // The key with the empty counterparty connection ID should not have been added. + require.Len(t, ccp.connectionStateCache, 1) + + // The fully populated ConnectionKey should still be the only entry for this connection. + // The connection is still marked open since it was open during initializeConnectionState. + require.True(t, ccp.connectionStateCache[k]) + + // Observe MsgConnectionOpenAck, which does have counterparty connection ID. + ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenAck, msgOpenAck, c) + + // Number of keys should still be 1. + require.Len(t, ccp.connectionStateCache, 1) + + // The fully populated ConnectionKey should still be the only entry for this connection. + require.True(t, ccp.connectionStateCache[k]) + }) +} + +func TestChannelStateCache(t *testing.T) { + var ( + mockChan = "channel-0" + mockCounterpartyChan = "channel-1" + msgOpenInit = provider.ChannelInfo{ + ChannelID: mockChan, + CounterpartyChannelID: "", // empty for MsgChannelOpenInit, different from all other channel handshake messages. + } + msgOpenAck = provider.ChannelInfo{ + ChannelID: mockChan, + CounterpartyChannelID: mockCounterpartyChan, // non-empty + } + + // fully populated channelKey + k = processor.ChannelInfoChannelKey(msgOpenAck) + ) + + t.Run("fresh handshake", func(t *testing.T) { + // Emulate scenario of a channel handshake occurring after the relayer is started. + // The channelStateCache will initially be empty for this channel. The MsgChannelOpenInit + // will populate the channelStateCache with a key that has an empty counterparty channel ID. + // The MsgChannelOpenTry needs to replace this key with a key that has the counterparty channel ID. + + ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}) + c := processor.NewIBCMessagesCache() + + // Observe MsgChannelOpenInit, which does not have counterparty channel ID. + ccp.handleChannelMessage(chantypes.EventTypeChannelOpenInit, msgOpenInit, c) + + require.Len(t, ccp.channelStateCache, 1) + + // The channel state is not open, but the entry should exist in the channelStateCache. + // MsgInitKey returns the ChannelKey with an empty counterparty channel ID. + require.False(t, ccp.channelStateCache[k.MsgInitKey()]) + + // Observe MsgChannelOpenAck, which does have counterparty channel ID. + ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c) + + // The key with the empty counterparty channel ID should have been removed. + // The key with the counterparty channel ID should have been added. + require.Len(t, ccp.channelStateCache, 1) + + // The fully populated ChannelKey should now be the only entry for this channel. + // The channel now open. + require.True(t, ccp.channelStateCache[k]) + }) + + t.Run("handshake already occurred", func(t *testing.T) { + // Emulate scenario of a channel handshake occurring before the relayer is started, + // but where all channel handshake messages occurred within the initial-block-history. + // This introduces an interesting edge condition where the channelStateCache will be populated + // with the open channel, including the counterparty channel ID, but then the + // MsgChannelOpenInit message will be observed afterwards without the counterparty channel ID. + // We need to make sure that the channelStateCache does not have two keys for the same channel, + // i.e. one key with the counterparty channel ID, and one without. + + ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}) + c := processor.NewIBCMessagesCache() + + // Initialize channelStateCache with populated channel ID and counterparty channel ID. + // This emulates initializeChannelState after a recent channel handshake has completed + ccp.channelStateCache[k] = true + + // Observe MsgChannelOpenInit, which does not have counterparty channel ID. + ccp.handleChannelMessage(chantypes.EventTypeChannelOpenInit, msgOpenInit, c) + + // The key with the empty counterparty channel ID should not have been added. + require.Len(t, ccp.channelStateCache, 1) + + // The fully populated ChannelKey should still be the only entry for this channel. + // The channel is still marked open since it was open during initializeChannelState. + require.True(t, ccp.channelStateCache[k]) + + // Observe MsgChannelOpenAck, which does have counterparty channel ID. + ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c) + + // Number of keys should still be 1. + require.Len(t, ccp.channelStateCache, 1) + + // The fully populated ChannelKey should still be the only entry for this channel. + require.True(t, ccp.channelStateCache[k]) + }) +} From a83eb9f58e6fbe48eb99637199fb68ee1d46128d Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 23 Aug 2022 13:14:33 -0600 Subject: [PATCH 3/4] Log messages in failed txs --- relayer/processor/path_processor_internal.go | 6 ++-- relayer/processor/types_internal.go | 34 ++++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index a527fd5f6..4cacc383d 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -725,12 +725,12 @@ func (pp *PathProcessor) assembleAndSendMessages( dst.trackProcessingChannelMessage(m) } - go pp.sendMessages(ctx, src, dst, om, pp.memo) + go pp.sendMessages(ctx, src, dst, &om, pp.memo) return nil } -func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRuntime, om outgoingMessages, memo string) { +func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRuntime, om *outgoingMessages, memo string) { ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() @@ -742,6 +742,7 @@ func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRunt zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), + zap.Object("messages", om), zap.Error(err), ) return @@ -751,6 +752,7 @@ func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRunt zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), + zap.Object("messages", om), zap.Error(err), ) return diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index cc629060a..81e742bc5 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -1,11 +1,13 @@ package processor import ( + "strconv" "strings" "sync" chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap/zapcore" ) // pathEndMessages holds the different IBC messages that @@ -196,6 +198,38 @@ type outgoingMessages struct { chanMsgs []channelMessageToTrack } +// MarshalLogObject satisfies the zapcore.ObjectMarshaler interface +// so that you can use zap.Object("messages", r) when logging. +// This is typically useful when logging details about a partially sent result. +func (om *outgoingMessages) MarshalLogObject(enc zapcore.ObjectEncoder) error { + for i, m := range om.pktMsgs { + pfx := "pkt_" + strconv.FormatInt(int64(i), 10) + "_" + enc.AddString(pfx+"evt_type", m.msg.eventType) + enc.AddString(pfx+"src_chan", m.msg.info.SourceChannel) + enc.AddString(pfx+"src_port", m.msg.info.SourcePort) + enc.AddString(pfx+"dst_chan", m.msg.info.DestChannel) + enc.AddString(pfx+"dst_port", m.msg.info.DestPort) + enc.AddString(pfx+"data", string(m.msg.info.Data)) + } + for i, m := range om.connMsgs { + pfx := "conn_" + strconv.FormatInt(int64(i), 10) + "_" + enc.AddString(pfx+"evt_type", m.msg.eventType) + enc.AddString(pfx+"client_id", m.msg.info.ClientID) + enc.AddString(pfx+"conn_id", m.msg.info.ConnID) + enc.AddString(pfx+"cntrprty_client_id", m.msg.info.CounterpartyClientID) + enc.AddString(pfx+"cntrprty_conn_id", m.msg.info.CounterpartyConnID) + } + for i, m := range om.chanMsgs { + pfx := "chan_" + strconv.FormatInt(int64(i), 10) + "_" + enc.AddString(pfx+"evt_type", m.msg.eventType) + enc.AddString(pfx+"chan_id", m.msg.info.ChannelID) + enc.AddString(pfx+"port_id", m.msg.info.PortID) + enc.AddString(pfx+"cntrprty_chan_id", m.msg.info.CounterpartyChannelID) + enc.AddString(pfx+"cntrprty_port_id", m.msg.info.CounterpartyPortID) + } + return nil +} + // Append acquires a lock on om's mutex and then appends msg. // When there are no more possible concurrent calls to Append, // it is safe to directly access om.msgs. From d09ae362eca654519dee3c34c794a28756c4aec9 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 23 Aug 2022 14:12:05 -0600 Subject: [PATCH 4/4] Tidy --- relayer/chains/cosmos/message_handlers.go | 1 - relayer/chains/cosmos/message_handlers_test.go | 2 +- relayer/processor/types_internal.go | 6 +++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/relayer/chains/cosmos/message_handlers.go b/relayer/chains/cosmos/message_handlers.go index 7e9f67ee9..0049ebb4f 100644 --- a/relayer/chains/cosmos/message_handlers.go +++ b/relayer/chains/cosmos/message_handlers.go @@ -72,7 +72,6 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi ccp.channelStateCache[channelKey] = false } } else { - // Clear out MsgInitKeys once we have the counterparty channel ID switch eventType { case chantypes.EventTypeChannelOpenTry: ccp.channelStateCache[channelKey] = false diff --git a/relayer/chains/cosmos/message_handlers_test.go b/relayer/chains/cosmos/message_handlers_test.go index 3ebdedb20..1fa617ae7 100644 --- a/relayer/chains/cosmos/message_handlers_test.go +++ b/relayer/chains/cosmos/message_handlers_test.go @@ -54,7 +54,7 @@ func TestConnectionStateCache(t *testing.T) { require.Len(t, ccp.connectionStateCache, 1) // The fully populated ConnectionKey should now be the only entry for this connection. - // The connection now open. + // The connection should now be open. require.True(t, ccp.connectionStateCache[k]) }) diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index 81e742bc5..f9a56c8f1 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -204,7 +204,7 @@ type outgoingMessages struct { func (om *outgoingMessages) MarshalLogObject(enc zapcore.ObjectEncoder) error { for i, m := range om.pktMsgs { pfx := "pkt_" + strconv.FormatInt(int64(i), 10) + "_" - enc.AddString(pfx+"evt_type", m.msg.eventType) + enc.AddString(pfx+"event_type", m.msg.eventType) enc.AddString(pfx+"src_chan", m.msg.info.SourceChannel) enc.AddString(pfx+"src_port", m.msg.info.SourcePort) enc.AddString(pfx+"dst_chan", m.msg.info.DestChannel) @@ -213,7 +213,7 @@ func (om *outgoingMessages) MarshalLogObject(enc zapcore.ObjectEncoder) error { } for i, m := range om.connMsgs { pfx := "conn_" + strconv.FormatInt(int64(i), 10) + "_" - enc.AddString(pfx+"evt_type", m.msg.eventType) + enc.AddString(pfx+"event_type", m.msg.eventType) enc.AddString(pfx+"client_id", m.msg.info.ClientID) enc.AddString(pfx+"conn_id", m.msg.info.ConnID) enc.AddString(pfx+"cntrprty_client_id", m.msg.info.CounterpartyClientID) @@ -221,7 +221,7 @@ func (om *outgoingMessages) MarshalLogObject(enc zapcore.ObjectEncoder) error { } for i, m := range om.chanMsgs { pfx := "chan_" + strconv.FormatInt(int64(i), 10) + "_" - enc.AddString(pfx+"evt_type", m.msg.eventType) + enc.AddString(pfx+"event_type", m.msg.eventType) enc.AddString(pfx+"chan_id", m.msg.info.ChannelID) enc.AddString(pfx+"port_id", m.msg.info.PortID) enc.AddString(pfx+"cntrprty_chan_id", m.msg.info.CounterpartyChannelID)