Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix - intermittent channel handshake failure #937

Merged
merged 6 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 48 additions & 10 deletions relayer/chains/cosmos/message_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,39 @@ func (ccp *CosmosChainProcessor) handlePacketMessage(eventType string, pi provid
func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provider.ChannelInfo, ibcMessagesCache processor.IBCMessagesCache) {
ccp.channelConnections[ci.ChannelID] = ci.ConnID
channelKey := processor.ChannelInfoChannelKey(ci)
switch eventType {
case chantypes.EventTypeChannelOpenInit, chantypes.EventTypeChannelOpenTry:
ccp.channelStateCache[channelKey] = false
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
ccp.channelStateCache[channelKey] = true
case chantypes.EventTypeChannelCloseConfirm:

if eventType == chantypes.EventTypeChannelOpenInit {
found := false
for k := range ccp.channelStateCache {
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
ccp.channelStateCache[k] = false
// Don't add a channelKey to the channelStateCache without counterparty channel ID
// since we already have the channelKey in the channelStateCache which includes the
// counterparty channel ID.
if k.MsgInitKey() == channelKey {
found = true
break
}
}
if !found {
ccp.channelStateCache[channelKey] = false
}
} else {
switch eventType {
case chantypes.EventTypeChannelOpenTry:
ccp.channelStateCache[channelKey] = false
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
ccp.channelStateCache[channelKey] = true
case chantypes.EventTypeChannelCloseConfirm:
for k := range ccp.channelStateCache {
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
ccp.channelStateCache[k] = false
break
}
}
}
// Clear out MsgInitKeys once we have the counterparty channel ID
delete(ccp.channelStateCache, channelKey.MsgInitKey())
}

ibcMessagesCache.ChannelHandshake.Retain(channelKey, eventType, ci)

ccp.logChannelMessage(eventType, ci)
Expand All @@ -77,8 +97,26 @@ func (ccp *CosmosChainProcessor) handleChannelMessage(eventType string, ci provi
func (ccp *CosmosChainProcessor) handleConnectionMessage(eventType string, ci provider.ConnectionInfo, ibcMessagesCache processor.IBCMessagesCache) {
ccp.connectionClients[ci.ConnID] = ci.ClientID
connectionKey := processor.ConnectionInfoConnectionKey(ci)
open := (eventType == conntypes.EventTypeConnectionOpenAck || eventType == conntypes.EventTypeConnectionOpenConfirm)
ccp.connectionStateCache[connectionKey] = open
if eventType == conntypes.EventTypeConnectionOpenInit {
found := false
for k := range ccp.connectionStateCache {
// Don't add a connectionKey to the connectionStateCache without counterparty connection ID
// since we already have the connectionKey in the connectionStateCache which includes the
// counterparty connection ID.
if k.MsgInitKey() == connectionKey {
found = true
break
}
}
if !found {
ccp.connectionStateCache[connectionKey] = false
}
} else {
// Clear out MsgInitKeys once we have the counterparty connection ID
delete(ccp.connectionStateCache, connectionKey.MsgInitKey())
open := (eventType == conntypes.EventTypeConnectionOpenAck || eventType == conntypes.EventTypeConnectionOpenConfirm)
ccp.connectionStateCache[connectionKey] = open
}
ibcMessagesCache.ConnectionHandshake.Retain(connectionKey, eventType, ci)

ccp.logConnectionMessage(eventType, ci)
Expand Down
180 changes: 180 additions & 0 deletions relayer/chains/cosmos/message_handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package cosmos

import (
"testing"

conntypes "github.com/cosmos/ibc-go/v5/modules/core/03-connection/types"
chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/cosmos/relayer/v2/relayer/provider"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestConnectionStateCache(t *testing.T) {
jtieri marked this conversation as resolved.
Show resolved Hide resolved
var (
mockConn = "connection-0"
mockCounterpartyConn = "connection-1"
msgOpenInit = provider.ConnectionInfo{
ConnID: mockConn,
CounterpartyConnID: "", // empty for MsgConnectionOpenInit, different from all other connection handshake messages.
}
msgOpenAck = provider.ConnectionInfo{
ConnID: mockConn,
CounterpartyConnID: mockCounterpartyConn, // non-empty
}

// fully populated connectionKey
k = processor.ConnectionInfoConnectionKey(msgOpenAck)
)

t.Run("fresh handshake", func(t *testing.T) {
// Emulate scenario of a connection handshake occurring after the relayer is started.
// The connectionStateCache will initially be empty for this connection. The MsgConnectionOpenInit
// will populate the connectionStateCache with a key that has an empty counterparty connection ID.
// The MsgConnectionOpenTry needs to replace this key with a key that has the counterparty connection ID.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
c := processor.NewIBCMessagesCache()

// Observe MsgConnectionOpenInit, which does not have counterparty connection ID.
ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenInit, msgOpenInit, c)

require.Len(t, ccp.connectionStateCache, 1)

// The connection state is not open, but the entry should exist in the connectionStateCache.
// MsgInitKey returns the ConnectionKey with an empty counterparty connection ID.
require.False(t, ccp.connectionStateCache[k.MsgInitKey()])

// Observe MsgConnectionOpenAck, which does have counterparty connection ID.
ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenAck, msgOpenAck, c)

// The key with the empty counterparty connection ID should have been removed.
// The key with the counterparty connection ID should have been added.
require.Len(t, ccp.connectionStateCache, 1)

// The fully populated ConnectionKey should now be the only entry for this connection.
// The connection should now be open.
require.True(t, ccp.connectionStateCache[k])
})

t.Run("handshake already occurred", func(t *testing.T) {
// Emulate scenario of a connection handshake occurring before the relayer is started,
// but where all connection handshake messages occurred within the initial-block-history.
// This introduces an interesting edge condition where the connectionStateCache will be populated
// with the open connection, including the counterparty connection ID, but then the
// MsgConnectionOpenInit message will be observed afterwards without the counterparty connection ID.
// We need to make sure that the connectionStateCache does not have two keys for the same connection,
// i.e. one key with the counterparty connection ID, and one without.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
c := processor.NewIBCMessagesCache()

// Initialize connectionStateCache with populated connection ID and counterparty connection ID.
// This emulates initializeConnectionState after a recent connection handshake has completed
ccp.connectionStateCache[k] = true

// Observe MsgConnectionOpenInit, which does not have counterparty connection ID.
ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenInit, msgOpenInit, c)

// The key with the empty counterparty connection ID should not have been added.
require.Len(t, ccp.connectionStateCache, 1)

// The fully populated ConnectionKey should still be the only entry for this connection.
// The connection is still marked open since it was open during initializeConnectionState.
require.True(t, ccp.connectionStateCache[k])

// Observe MsgConnectionOpenAck, which does have counterparty connection ID.
ccp.handleConnectionMessage(conntypes.EventTypeConnectionOpenAck, msgOpenAck, c)

// Number of keys should still be 1.
require.Len(t, ccp.connectionStateCache, 1)

// The fully populated ConnectionKey should still be the only entry for this connection.
require.True(t, ccp.connectionStateCache[k])
})
}

func TestChannelStateCache(t *testing.T) {
var (
mockChan = "channel-0"
mockCounterpartyChan = "channel-1"
msgOpenInit = provider.ChannelInfo{
ChannelID: mockChan,
CounterpartyChannelID: "", // empty for MsgChannelOpenInit, different from all other channel handshake messages.
}
msgOpenAck = provider.ChannelInfo{
ChannelID: mockChan,
CounterpartyChannelID: mockCounterpartyChan, // non-empty
}

// fully populated channelKey
k = processor.ChannelInfoChannelKey(msgOpenAck)
)

t.Run("fresh handshake", func(t *testing.T) {
// Emulate scenario of a channel handshake occurring after the relayer is started.
// The channelStateCache will initially be empty for this channel. The MsgChannelOpenInit
// will populate the channelStateCache with a key that has an empty counterparty channel ID.
// The MsgChannelOpenTry needs to replace this key with a key that has the counterparty channel ID.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
c := processor.NewIBCMessagesCache()

// Observe MsgChannelOpenInit, which does not have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenInit, msgOpenInit, c)

require.Len(t, ccp.channelStateCache, 1)

// The channel state is not open, but the entry should exist in the channelStateCache.
// MsgInitKey returns the ChannelKey with an empty counterparty channel ID.
require.False(t, ccp.channelStateCache[k.MsgInitKey()])

// Observe MsgChannelOpenAck, which does have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)

// The key with the empty counterparty channel ID should have been removed.
// The key with the counterparty channel ID should have been added.
require.Len(t, ccp.channelStateCache, 1)

// The fully populated ChannelKey should now be the only entry for this channel.
// The channel now open.
require.True(t, ccp.channelStateCache[k])
})

t.Run("handshake already occurred", func(t *testing.T) {
// Emulate scenario of a channel handshake occurring before the relayer is started,
// but where all channel handshake messages occurred within the initial-block-history.
// This introduces an interesting edge condition where the channelStateCache will be populated
// with the open channel, including the counterparty channel ID, but then the
// MsgChannelOpenInit message will be observed afterwards without the counterparty channel ID.
// We need to make sure that the channelStateCache does not have two keys for the same channel,
// i.e. one key with the counterparty channel ID, and one without.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
c := processor.NewIBCMessagesCache()

// Initialize channelStateCache with populated channel ID and counterparty channel ID.
// This emulates initializeChannelState after a recent channel handshake has completed
ccp.channelStateCache[k] = true

// Observe MsgChannelOpenInit, which does not have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenInit, msgOpenInit, c)

// The key with the empty counterparty channel ID should not have been added.
require.Len(t, ccp.channelStateCache, 1)

// The fully populated ChannelKey should still be the only entry for this channel.
// The channel is still marked open since it was open during initializeChannelState.
require.True(t, ccp.channelStateCache[k])

// Observe MsgChannelOpenAck, which does have counterparty channel ID.
ccp.handleChannelMessage(chantypes.EventTypeChannelOpenAck, msgOpenAck, c)

// Number of keys should still be 1.
require.Len(t, ccp.channelStateCache, 1)

// The fully populated ChannelKey should still be the only entry for this channel.
require.True(t, ccp.channelStateCache[k])
})
}
16 changes: 8 additions & 8 deletions relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,8 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func()
return
}

pathEnd.connectionStateCache.Merge(d.ConnectionStateCache) // Update latest connection open state for chain
pathEnd.channelStateCache.Merge(d.ChannelStateCache) // Update latest channel open state for chain
pathEnd.connectionStateCache = d.ConnectionStateCache // Update latest connection open state for chain
pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain

pathEnd.mergeMessageCache(d.IBCMessagesCache, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog

Expand Down Expand Up @@ -428,14 +428,14 @@ func (pathEnd *pathEndRuntime) shouldSendConnectionMessage(message connectionIBC
counterpartyKey := k.Counterparty()
switch eventType {
case conntypes.EventTypeConnectionOpenInit:
toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.msgInitKey()}
toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.MsgInitKey()}
case conntypes.EventTypeConnectionOpenAck:
toDeleteCounterparty[conntypes.EventTypeConnectionOpenTry] = []ConnectionKey{counterpartyKey}
toDelete[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{k.msgInitKey()}
toDelete[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{k.MsgInitKey()}
case conntypes.EventTypeConnectionOpenConfirm:
toDeleteCounterparty[conntypes.EventTypeConnectionOpenAck] = []ConnectionKey{counterpartyKey}
toDelete[conntypes.EventTypeConnectionOpenTry] = []ConnectionKey{k}
toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.msgInitKey()}
toDeleteCounterparty[conntypes.EventTypeConnectionOpenInit] = []ConnectionKey{counterpartyKey.MsgInitKey()}
}
// delete in progress send for this specific message
pathEnd.connProcessing.deleteMessages(map[string][]ConnectionKey{eventType: []ConnectionKey{k}})
Expand Down Expand Up @@ -499,14 +499,14 @@ func (pathEnd *pathEndRuntime) shouldSendChannelMessage(message channelIBCMessag
counterpartyKey := channelKey.Counterparty()
switch eventType {
case chantypes.EventTypeChannelOpenTry:
toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.msgInitKey()}
toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()}
case chantypes.EventTypeChannelOpenAck:
toDeleteCounterparty[chantypes.EventTypeChannelOpenTry] = []ChannelKey{counterpartyKey}
toDelete[chantypes.EventTypeChannelOpenInit] = []ChannelKey{channelKey.msgInitKey()}
toDelete[chantypes.EventTypeChannelOpenInit] = []ChannelKey{channelKey.MsgInitKey()}
case chantypes.EventTypeChannelOpenConfirm:
toDeleteCounterparty[chantypes.EventTypeChannelOpenAck] = []ChannelKey{counterpartyKey}
toDelete[chantypes.EventTypeChannelOpenTry] = []ChannelKey{channelKey}
toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.msgInitKey()}
toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()}
case chantypes.EventTypeChannelCloseConfirm:
toDeleteCounterparty[chantypes.EventTypeChannelCloseInit] = []ChannelKey{counterpartyKey}
toDelete[chantypes.EventTypeChannelCloseConfirm] = []ChannelKey{channelKey}
Expand Down
14 changes: 8 additions & 6 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ ConnectionHandshakeLoop:
// MsgConnectionOpenInit does not have counterparty connection ID, so check if everything
// else matches for counterparty. If so, add counterparty connection ID for
// the checks later on in this function.
if openInitKey.ConnectionID == openTryKey.CounterpartyConnID && openInitKey.ClientID == openTryKey.CounterpartyClientID && openInitKey.CounterpartyClientID == openTryKey.ClientID {
if openInitKey == openTryKey.Counterparty().MsgInitKey() {
openInitKey.CounterpartyConnID = openTryKey.ConnectionID
foundOpenTry = &openTryMsg
break
Expand Down Expand Up @@ -281,7 +281,7 @@ ChannelHandshakeLoop:
// MsgChannelOpenInit does not have counterparty channel ID, so check if everything
// else matches for counterparty. If so, add counterparty channel ID for
// the checks later on in this function.
if openInitKey == openTryKey.Counterparty().msgInitKey() {
if openInitKey == openTryKey.Counterparty().MsgInitKey() {
openInitKey.CounterpartyChannelID = openTryMsg.ChannelID
foundOpenTry = &openTryMsg
break
Expand Down Expand Up @@ -339,7 +339,7 @@ ChannelHandshakeLoop:
res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck], openInitKey)
res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm] = append(res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm], openInitKey)
// MsgChannelOpenInit does not have CounterpartyChannelID
res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openInitKey.msgInitKey())
res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openInitKey.MsgInitKey())
}

// now iterate through channel-handshake-complete messages and remove any leftover messages
Expand All @@ -348,7 +348,7 @@ ChannelHandshakeLoop:
res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenAck], openConfirmKey)
res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm] = append(res.ToDeleteDst[chantypes.EventTypeChannelOpenConfirm], openConfirmKey)
// MsgChannelOpenInit does not have CounterpartyChannelID
res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openConfirmKey.msgInitKey())
res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit] = append(res.ToDeleteSrc[chantypes.EventTypeChannelOpenInit], openConfirmKey.MsgInitKey())
}
return res
}
Expand Down Expand Up @@ -725,12 +725,12 @@ func (pp *PathProcessor) assembleAndSendMessages(
dst.trackProcessingChannelMessage(m)
}

go pp.sendMessages(ctx, src, dst, om, pp.memo)
go pp.sendMessages(ctx, src, dst, &om, pp.memo)

return nil
}

func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRuntime, om outgoingMessages, memo string) {
func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRuntime, om *outgoingMessages, memo string) {
ctx, cancel := context.WithTimeout(ctx, messageSendTimeout)
defer cancel()

Expand All @@ -742,6 +742,7 @@ func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRunt
zap.String("dst_chain_id", dst.info.ChainID),
zap.String("src_client_id", src.info.ClientID),
zap.String("dst_client_id", dst.info.ClientID),
zap.Object("messages", om),
zap.Error(err),
)
return
Expand All @@ -751,6 +752,7 @@ func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRunt
zap.String("dst_chain_id", dst.info.ChainID),
zap.String("src_client_id", src.info.ClientID),
zap.String("dst_client_id", dst.info.ClientID),
zap.Object("messages", om),
zap.Error(err),
)
return
Expand Down
Loading