From f29a2c7b20a1d9afba4906c592f26dc4add1cb71 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 27 Mar 2023 11:10:37 -0600 Subject: [PATCH] Add channel close correlation (#1145) * Add channel close correlation * Switch to pre-close key * make tx channel-close cli command work, add test coverage * more sweet code removals * update comment --- interchaintest/ica_channel_close_test.go | 315 +++++++++++++++++++ interchaintest/interchain_accounts_test.go | 6 +- relayer/channel.go | 61 +++- relayer/processor/path_end_runtime.go | 44 ++- relayer/processor/path_processor.go | 6 +- relayer/processor/path_processor_internal.go | 295 ++++++++++++----- relayer/processor/types.go | 12 + relayer/processor/types_internal.go | 27 +- 8 files changed, 652 insertions(+), 114 deletions(-) create mode 100644 interchaintest/ica_channel_close_test.go diff --git a/interchaintest/ica_channel_close_test.go b/interchaintest/ica_channel_close_test.go new file mode 100644 index 000000000..30e4dcee0 --- /dev/null +++ b/interchaintest/ica_channel_close_test.go @@ -0,0 +1,315 @@ +package interchaintest_test + +import ( + "context" + "encoding/json" + "strconv" + "testing" + "time" + + "github.com/cosmos/cosmos-sdk/crypto/keyring" + relayerinterchaintest "github.com/cosmos/relayer/v2/interchaintest" + interchaintest "github.com/strangelove-ventures/interchaintest/v7" + "github.com/strangelove-ventures/interchaintest/v7/chain/cosmos" + "github.com/strangelove-ventures/interchaintest/v7/ibc" + "github.com/strangelove-ventures/interchaintest/v7/testreporter" + "github.com/strangelove-ventures/interchaintest/v7/testutil" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" +) + +// TestScenarioICAChannelClose is very similar to the TestScenarioInterchainAccounts, +// but instead it tests manually closing the channel using the relayer CLI. +func TestScenarioICAChannelClose(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + + t.Parallel() + + client, network := interchaintest.DockerSetup(t) + + rep := testreporter.NewNopReporter() + eRep := rep.RelayerExecReporter(t) + + ctx := context.Background() + + // Get both chains + nf := 0 + nv := 1 + cf := interchaintest.NewBuiltinChainFactory(zaptest.NewLogger(t), []*interchaintest.ChainSpec{ + { + Name: "icad", + NumValidators: &nv, + NumFullNodes: &nf, + ChainConfig: ibc.ChainConfig{ + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, + }, + }, + { + Name: "icad", + NumValidators: &nv, + NumFullNodes: &nf, + ChainConfig: ibc.ChainConfig{ + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, + }, + }, + }) + + chains, err := cf.Chains(t.Name()) + require.NoError(t, err) + + chain1, chain2 := chains[0], chains[1] + + // Get a relayer instance + r := relayerinterchaintest. + NewRelayerFactory(relayerinterchaintest.RelayerConfig{}). + Build(t, client, network) + + // Build the network; spin up the chains and configure the relayer + const pathName = "test-path" + const relayerName = "relayer" + + ic := interchaintest.NewInterchain(). + AddChain(chain1). + AddChain(chain2). + AddRelayer(r, relayerName). + AddLink(interchaintest.InterchainLink{ + Chain1: chain1, + Chain2: chain2, + Relayer: r, + Path: pathName, + }) + + require.NoError(t, ic.Build(ctx, eRep, interchaintest.InterchainBuildOptions{ + TestName: t.Name(), + Client: client, + NetworkID: network, + SkipPathCreation: true, + BlockDatabaseFile: interchaintest.DefaultBlockDatabaseFilepath(), + })) + + // Fund a user account on chain1 and chain2 + const userFunds = int64(10_000_000_000) + users := interchaintest.GetAndFundTestUsers(t, ctx, t.Name(), userFunds, chain1, chain2) + chain1User := users[0] + chain2User := users[1] + + // Generate a new IBC path + err = r.GeneratePath(ctx, eRep, chain1.Config().ChainID, chain2.Config().ChainID, pathName) + require.NoError(t, err) + + // Create new clients + err = r.CreateClients(ctx, eRep, pathName, ibc.CreateClientOptions{TrustingPeriod: "330h"}) + require.NoError(t, err) + + err = testutil.WaitForBlocks(ctx, 5, chain1, chain2) + require.NoError(t, err) + + // Create a new connection + err = r.CreateConnections(ctx, eRep, pathName) + require.NoError(t, err) + + err = testutil.WaitForBlocks(ctx, 5, chain1, chain2) + require.NoError(t, err) + + // Query for the newly created connection + connections, err := r.GetConnections(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(connections)) + + // Register a new interchain account on chain2, on behalf of the user acc on chain1 + chain1Addr := chain1User.(*cosmos.CosmosWallet).FormattedAddressWithPrefix(chain1.Config().Bech32Prefix) + + registerICA := []string{ + chain1.Config().Bin, "tx", "intertx", "register", + "--from", chain1Addr, + "--connection-id", connections[0].ID, + "--chain-id", chain1.Config().ChainID, + "--home", chain1.HomeDir(), + "--node", chain1.GetRPCAddress(), + "--keyring-backend", keyring.BackendTest, + "-y", + } + _, _, err = chain1.Exec(ctx, registerICA, nil) + require.NoError(t, err) + + // Start the relayer and set the cleanup function. + err = r.StartRelayer(ctx, eRep, pathName) + require.NoError(t, err) + + t.Cleanup( + func() { + err := r.StopRelayer(ctx, eRep) + if err != nil { + t.Logf("an error occured while stopping the relayer: %s", err) + } + }, + ) + + // Wait for relayer to start up and finish channel handshake + err = testutil.WaitForBlocks(ctx, 15, chain1, chain2) + require.NoError(t, err) + + // Query for the newly registered interchain account + queryICA := []string{ + chain1.Config().Bin, "query", "intertx", "interchainaccounts", connections[0].ID, chain1Addr, + "--chain-id", chain1.Config().ChainID, + "--home", chain1.HomeDir(), + "--node", chain1.GetRPCAddress(), + } + stdout, _, err := chain1.Exec(ctx, queryICA, nil) + require.NoError(t, err) + + icaAddr := parseInterchainAccountField(stdout) + require.NotEmpty(t, icaAddr) + + // Get initial account balances + chain2Addr := chain2User.(*cosmos.CosmosWallet).FormattedAddressWithPrefix(chain2.Config().Bech32Prefix) + + chain2OrigBal, err := chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + + icaOrigBal, err := chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + + // Send funds to ICA from user account on chain2 + const transferAmount = 10000 + transfer := ibc.WalletAmount{ + Address: icaAddr, + Denom: chain2.Config().Denom, + Amount: transferAmount, + } + err = chain2.SendFunds(ctx, chain2User.KeyName(), transfer) + require.NoError(t, err) + + // Wait for transfer to be complete and assert balances + err = testutil.WaitForBlocks(ctx, 5, chain2) + require.NoError(t, err) + + chain2Bal, err := chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, chain2OrigBal-transferAmount, chain2Bal) + + icaBal, err := chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, icaOrigBal+transferAmount, icaBal) + + // Build bank transfer msg + rawMsg, err := json.Marshal(map[string]any{ + "@type": "/cosmos.bank.v1beta1.MsgSend", + "from_address": icaAddr, + "to_address": chain2Addr, + "amount": []map[string]any{ + { + "denom": chain2.Config().Denom, + "amount": strconv.Itoa(transferAmount), + }, + }, + }) + require.NoError(t, err) + + // Send bank transfer msg to ICA on chain2 from the user account on chain1 + sendICATransfer := []string{ + chain1.Config().Bin, "tx", "intertx", "submit", string(rawMsg), + "--connection-id", connections[0].ID, + "--from", chain1Addr, + "--chain-id", chain1.Config().ChainID, + "--home", chain1.HomeDir(), + "--node", chain1.GetRPCAddress(), + "--keyring-backend", keyring.BackendTest, + "-y", + } + _, _, err = chain1.Exec(ctx, sendICATransfer, nil) + require.NoError(t, err) + + // Wait for tx to be relayed + err = testutil.WaitForBlocks(ctx, 10, chain2) + require.NoError(t, err) + + // Assert that the funds have been received by the user account on chain2 + chain2Bal, err = chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, chain2OrigBal, chain2Bal) + + // Assert that the funds have been removed from the ICA on chain2 + icaBal, err = chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, icaOrigBal, icaBal) + + // Stop the relayer and wait for the process to terminate + err = r.StopRelayer(ctx, eRep) + require.NoError(t, err) + + err = testutil.WaitForBlocks(ctx, 5, chain1, chain2) + require.NoError(t, err) + + // Send another bank transfer msg to ICA on chain2 from the user account on chain1. + // This message should timeout and the channel will be closed when we re-start the relayer. + _, _, err = chain1.Exec(ctx, sendICATransfer, nil) + require.NoError(t, err) + + // Wait for approximately one minute to allow packet timeout threshold to be hit + time.Sleep(70 * time.Second) + + chain1Chans, err := r.GetChannels(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(chain1Chans)) + + // Close the channel using the channel close CLI method + res := r.Exec(ctx, eRep, []string{"tx", "channel-close", pathName, chain1Chans[0].ChannelID, chain1Chans[0].PortID}, nil) + require.NoError(t, res.Err) + require.Zero(t, res.ExitCode) + + // Assert that the packet timed out and that the acc balances are correct + chain2Bal, err = chain2.GetBalance(ctx, chain2Addr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, chain2OrigBal, chain2Bal) + + icaBal, err = chain2.GetBalance(ctx, icaAddr, chain2.Config().Denom) + require.NoError(t, err) + require.Equal(t, icaOrigBal, icaBal) + + // Assert that the channel ends are both closed + chain1Chans, err = r.GetChannels(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(chain1Chans)) + require.Subset(t, []string{"STATE_CLOSED", "Closed"}, []string{chain1Chans[0].State}) + + chain2Chans, err := r.GetChannels(ctx, eRep, chain2.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 1, len(chain2Chans)) + require.Subset(t, []string{"STATE_CLOSED", "Closed"}, []string{chain2Chans[0].State}) + + // Restart the relayer for the next channel handshake + err = r.StartRelayer(ctx, eRep, pathName) + require.NoError(t, err) + + // Attempt to open another channel for the same ICA + _, _, err = chain1.Exec(ctx, registerICA, nil) + require.NoError(t, err) + + // Wait for channel handshake to finish + err = testutil.WaitForBlocks(ctx, 15, chain1, chain2) + require.NoError(t, err) + + // Assert that a new channel has been opened and the same ICA is in use + stdout, _, err = chain1.Exec(ctx, queryICA, nil) + require.NoError(t, err) + + newICA := parseInterchainAccountField(stdout) + require.NotEmpty(t, newICA) + require.Equal(t, icaAddr, newICA) + + chain1Chans, err = r.GetChannels(ctx, eRep, chain1.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 2, len(chain1Chans)) + require.Subset(t, []string{"STATE_OPEN", "Open"}, []string{chain1Chans[1].State}) + + chain2Chans, err = r.GetChannels(ctx, eRep, chain2.Config().ChainID) + require.NoError(t, err) + require.Equal(t, 2, len(chain2Chans)) + require.Subset(t, []string{"STATE_OPEN", "Open"}, []string{chain2Chans[1].State}) +} diff --git a/interchaintest/interchain_accounts_test.go b/interchaintest/interchain_accounts_test.go index 37be21e5d..3da235746 100644 --- a/interchaintest/interchain_accounts_test.go +++ b/interchaintest/interchain_accounts_test.go @@ -44,7 +44,8 @@ func TestScenarioInterchainAccounts(t *testing.T) { NumValidators: &nv, NumFullNodes: &nf, ChainConfig: ibc.ChainConfig{ - Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.3.5"}}, + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, }, }, { @@ -52,7 +53,8 @@ func TestScenarioInterchainAccounts(t *testing.T) { NumValidators: &nv, NumFullNodes: &nf, ChainConfig: ibc.ChainConfig{ - Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.3.5"}}, + Images: []ibc.DockerImage{{Repository: "ghcr.io/cosmos/ibc-go-icad", Version: "v0.5.0"}}, + UsingNewGenesisCommand: true, }, }, }) diff --git a/relayer/channel.go b/relayer/channel.go index f5aa9dc0e..5274d405d 100644 --- a/relayer/channel.go +++ b/relayer/channel.go @@ -114,9 +114,47 @@ func (c *Chain) CloseChannel( // Timeout is per message. Two close channel handshake messages, allowing maxRetries for each. processorTimeout := timeout * 2 * time.Duration(maxRetries) + // Perform a flush first so that any timeouts are cleared. + flushCtx, flushCancel := context.WithTimeout(ctx, processorTimeout) + defer flushCancel() + + flushProcessor := processor.NewEventProcessor(). + WithChainProcessors( + c.chainProcessor(c.log, nil), + dst.chainProcessor(c.log, nil), + ). + WithPathProcessors(processor.NewPathProcessor( + c.log, + processor.NewPathEnd(pathName, c.PathEnd.ChainID, c.PathEnd.ClientID, "", []processor.ChainChannelKey{}), + processor.NewPathEnd(pathName, dst.PathEnd.ChainID, dst.PathEnd.ClientID, "", []processor.ChainChannelKey{}), + nil, + memo, + DefaultClientUpdateThreshold, + DefaultFlushInterval, + )). + WithInitialBlockHistory(0). + WithMessageLifecycle(&processor.FlushLifecycle{}). + Build() + + c.log.Info("Starting event processor for flush before channel close", + zap.String("src_chain_id", c.PathEnd.ChainID), + zap.String("src_port_id", srcPortID), + zap.String("dst_chain_id", dst.PathEnd.ChainID), + ) + + if err := flushProcessor.Run(flushCtx); err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, processorTimeout) defer cancel() + c.log.Info("Starting event processor for channel close", + zap.String("src_chain_id", c.PathEnd.ChainID), + zap.String("src_port_id", srcPortID), + zap.String("dst_chain_id", dst.PathEnd.ChainID), + ) + return processor.NewEventProcessor(). WithChainProcessors( c.chainProcessor(c.log, nil), @@ -132,23 +170,12 @@ func (c *Chain) CloseChannel( DefaultFlushInterval, )). WithInitialBlockHistory(0). - WithMessageLifecycle(&processor.ChannelMessageLifecycle{ - Initial: &processor.ChannelMessage{ - ChainID: c.PathEnd.ChainID, - EventType: chantypes.EventTypeChannelCloseInit, - Info: provider.ChannelInfo{ - PortID: srcPortID, - ChannelID: srcChanID, - }, - }, - Termination: &processor.ChannelMessage{ - ChainID: dst.PathEnd.ChainID, - EventType: chantypes.EventTypeChannelCloseConfirm, - Info: provider.ChannelInfo{ - CounterpartyPortID: srcPortID, - CounterpartyChannelID: srcChanID, - }, - }, + WithMessageLifecycle(&processor.ChannelCloseLifecycle{ + SrcChainID: c.PathEnd.ChainID, + SrcChannelID: srcChanID, + SrcPortID: srcPortID, + SrcConnID: c.PathEnd.ConnectionID, + DstConnID: dst.PathEnd.ConnectionID, }). Build(). Run(ctx) diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index a9d26d6aa..9796ade1c 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -232,6 +232,12 @@ func (pathEnd *pathEndRuntime) shouldTerminate(ibcMessagesCache IBCMessagesCache foundCounterpartyChannelID := m.Termination.Info.CounterpartyChannelID == "" foundCounterpartyPortID := m.Termination.Info.CounterpartyPortID == "" for _, ci := range cache { + pathEnd.log.Info("Channel handshake termination candidate", + zap.String("termination_port_id", m.Termination.Info.PortID), + zap.String("observed_port_id", ci.PortID), + zap.String("termination_counterparty_port_id", m.Termination.Info.CounterpartyPortID), + zap.String("observed_counterparty_port_id", ci.CounterpartyPortID), + ) if ci.ChannelID == m.Termination.Info.ChannelID { foundChannelID = true } @@ -249,6 +255,41 @@ func (pathEnd *pathEndRuntime) shouldTerminate(ibcMessagesCache IBCMessagesCache pathEnd.log.Info("Found termination condition for channel handshake") return true } + case *ChannelCloseLifecycle: + cache, ok := ibcMessagesCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm] + if !ok { + return false + } + // check against m.Termination.Info + foundChannelID := m.SrcChannelID == "" + foundPortID := m.SrcPortID == "" + for _, ci := range cache { + pathEnd.log.Info("Channel close termination candidate", + zap.String("termination_port_id", m.SrcPortID), + zap.String("observed_port_id", ci.PortID), + zap.String("termination_channel_id", m.SrcChannelID), + zap.String("observed_channel_id", ci.ChannelID), + ) + if pathEnd.info.ChainID == m.SrcChainID { + if ci.ChannelID == m.SrcChannelID { + foundChannelID = true + } + if ci.PortID == m.SrcPortID { + foundPortID = true + } + } else { + if ci.CounterpartyChannelID == m.SrcChannelID { + foundChannelID = true + } + if ci.CounterpartyPortID == m.SrcPortID { + foundPortID = true + } + } + } + if foundChannelID && foundPortID { + pathEnd.log.Info("Found termination condition for channel close") + return true + } case *ConnectionMessageLifecycle: if m.Termination == nil || m.Termination.ChainID != pathEnd.info.ChainID { return false @@ -620,8 +661,9 @@ func (pathEnd *pathEndRuntime) shouldSendChannelMessage(message channelIBCMessag toDeleteCounterparty[chantypes.EventTypeChannelOpenInit] = []ChannelKey{counterpartyKey.MsgInitKey()} toDeleteCounterparty[preInitKey] = []ChannelKey{counterpartyKey.MsgInitKey()} case chantypes.EventTypeChannelCloseConfirm: - toDeleteCounterparty[chantypes.EventTypeChannelCloseInit] = []ChannelKey{counterpartyKey} toDelete[chantypes.EventTypeChannelCloseConfirm] = []ChannelKey{channelKey} + toDeleteCounterparty[chantypes.EventTypeChannelCloseInit] = []ChannelKey{counterpartyKey} + toDeleteCounterparty[preCloseKey] = []ChannelKey{counterpartyKey} // Gather relevant send packet messages, for this channel key, that should be deleted if we // are operating on an ordered channel. diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 3fb8f6b95..976037ba1 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -180,7 +180,7 @@ func (pp *PathProcessor) channelPairs() []channelPair { } pairs := make([]channelPair, len(channels)) i := 0 - for k, _ := range channels { + for k := range channels { pairs[i] = channelPair{ pathEnd1ChannelKey: k, pathEnd2ChannelKey: k.Counterparty(), @@ -321,13 +321,13 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { if pp.shouldFlush() && !pp.initialFlushComplete { pp.flush(ctx) pp.initialFlushComplete = true - } else if pp.shouldTerminateForFlushComplete(ctx, cancel) { + } else if pp.shouldTerminateForFlushComplete() { cancel() return } // process latest message cache state from both pathEnds - if err := pp.processLatestMessages(ctx); err != nil { + if err := pp.processLatestMessages(ctx, cancel); err != nil { // in case of IBC message send errors, schedule retry after durationErrorRetry if retryTimer != nil { retryTimer.Stop() diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 9ee4c2a72..7112d46c8 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -17,7 +17,10 @@ import ( // preInitKey is used to declare intent to initialize a connection or channel handshake // i.e. a MsgConnectionOpenInit or a MsgChannelOpenInit should be broadcasted to start // the handshake if this key exists in the relevant cache. -const preInitKey = "pre_init" +const ( + preInitKey = "pre_init" + preCloseKey = "pre_close" +) // getMessagesToSend returns only the lowest sequence message (if it should be sent) for ordered channels, // otherwise returns all which should be sent. @@ -123,39 +126,21 @@ func (pp *PathProcessor) unrelayedPacketFlowMessages( for seq, info := range pathEndPacketFlowMessages.SrcMsgTimeout { deletePreInitIfMatches(info) toDeleteSrc[chantypes.EventTypeSendPacket] = append(toDeleteSrc[chantypes.EventTypeSendPacket], seq) + toDeleteSrc[chantypes.EventTypeTimeoutPacket] = append(toDeleteSrc[chantypes.EventTypeTimeoutPacket], seq) if info.ChannelOrder == chantypes.ORDERED.String() { - // For ordered channel packets, flow is not done until channel-close-confirm is observed. - if pathEndPacketFlowMessages.DstMsgChannelCloseConfirm == nil { - // have not observed a channel-close-confirm yet for this channel, send it if ready. - // will come back through here next block if not yet ready. - closeChan := channelIBCMessage{ - eventType: chantypes.EventTypeChannelCloseConfirm, - info: provider.ChannelInfo{ - Height: info.Height, - PortID: info.SourcePort, - ChannelID: info.SourceChannel, - CounterpartyPortID: info.DestPort, - CounterpartyChannelID: info.DestChannel, - Order: orderFromString(info.ChannelOrder), - }, - } - - if pathEndPacketFlowMessages.Dst.shouldSendChannelMessage(closeChan, pathEndPacketFlowMessages.Src) { - res.DstChannelMessage = append(res.DstChannelMessage, closeChan) - } - } else { - // ordered channel, and we have a channel close confirm, so packet-flow and channel-close-flow is complete. - // remove all retention of this sequence number and this channel-close-confirm. - toDeleteDstChannel[chantypes.EventTypeChannelCloseConfirm] = append( - toDeleteDstChannel[chantypes.EventTypeChannelCloseConfirm], - k.Counterparty(), - ) - toDeleteSrc[chantypes.EventTypeTimeoutPacket] = append(toDeleteSrc[chantypes.EventTypeTimeoutPacket], seq) + // Channel is now closed on src. + // enqueue channel close init observation to be handled by channel close correlation + if _, ok := pathEndPacketFlowMessages.Src.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit]; !ok { + pathEndPacketFlowMessages.Src.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit] = make(ChannelMessageCache) + } + pathEndPacketFlowMessages.Src.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit][k] = provider.ChannelInfo{ + Height: info.Height, + PortID: info.SourcePort, + ChannelID: info.SourceChannel, + CounterpartyPortID: info.DestPort, + CounterpartyChannelID: info.DestChannel, + Order: orderFromString(info.ChannelOrder), } - } else { - // unordered channel, and we have a timeout for this packet, so packet flow is complete - // remove all retention of this sequence number - toDeleteSrc[chantypes.EventTypeTimeoutPacket] = append(toDeleteSrc[chantypes.EventTypeTimeoutPacket], seq) } } @@ -488,6 +473,76 @@ func (pp *PathProcessor) unrelayedChannelHandshakeMessages( return res } +func (pp *PathProcessor) unrelayedChannelCloseMessages( + pathEndChannelCloseMessages pathEndChannelCloseMessages, +) pathEndChannelHandshakeResponse { + var ( + res pathEndChannelHandshakeResponse + toDeleteSrc = make(map[string][]ChannelKey) + toDeleteDst = make(map[string][]ChannelKey) + ) + processRemovals := func() { + pathEndChannelCloseMessages.Src.messageCache.ChannelHandshake.DeleteMessages(toDeleteSrc) + pathEndChannelCloseMessages.Dst.messageCache.ChannelHandshake.DeleteMessages(toDeleteDst) + pathEndChannelCloseMessages.Src.channelProcessing.deleteMessages(toDeleteSrc) + pathEndChannelCloseMessages.Dst.channelProcessing.deleteMessages(toDeleteDst) + toDeleteSrc = make(map[string][]ChannelKey) + toDeleteDst = make(map[string][]ChannelKey) + } + + for chanKey := range pathEndChannelCloseMessages.DstMsgChannelCloseConfirm { + // found close confirm, channel handshake complete. remove all retention + + counterpartyKey := chanKey.Counterparty() + toDeleteDst[chantypes.EventTypeChannelCloseConfirm] = append( + toDeleteDst[chantypes.EventTypeChannelCloseConfirm], + chanKey, + ) + // MsgChannelCloseInit does not have CounterpartyChannelID // TODO: confirm this + toDeleteSrc[chantypes.EventTypeChannelCloseInit] = append( + toDeleteSrc[chantypes.EventTypeChannelCloseInit], + counterpartyKey.MsgInitKey(), + ) + // TODO: confirm chankey does not need modification + toDeleteSrc[preCloseKey] = append(toDeleteSrc[preCloseKey], counterpartyKey) + } + + processRemovals() + + for chanKey, info := range pathEndChannelCloseMessages.SrcMsgChannelCloseInit { + // need to send a close confirm to dst + msgCloseConfirm := channelIBCMessage{ + eventType: chantypes.EventTypeChannelCloseConfirm, + info: info, + } + if pathEndChannelCloseMessages.Dst.shouldSendChannelMessage( + msgCloseConfirm, pathEndChannelCloseMessages.Src, + ) { + res.DstMessages = append(res.DstMessages, msgCloseConfirm) + } + + // TODO: confirm chankey does not need modification + toDeleteSrc[preCloseKey] = append(toDeleteSrc[preCloseKey], chanKey) + } + + processRemovals() + + for _, info := range pathEndChannelCloseMessages.SrcMsgChannelPreInit { + // need to send a close init to src + msgCloseInit := channelIBCMessage{ + eventType: chantypes.EventTypeChannelCloseInit, + info: info, + } + if pathEndChannelCloseMessages.Src.shouldSendChannelMessage( + msgCloseInit, pathEndChannelCloseMessages.Dst, + ) { + res.SrcMessages = append(res.SrcMessages, msgCloseInit) + } + } + + return res +} + func (pp *PathProcessor) getUnrelayedClientICQMessages(pathEnd *pathEndRuntime, queryMessages, responseMessages ClientICQMessageCache) (res []clientICQMessage) { ClientICQLoop: for queryID, queryMsg := range queryMessages { @@ -564,7 +619,7 @@ var observedEventTypeForDesiredMessage = map[string]string{ chantypes.EventTypeSendPacket: preInitKey, } -func (pp *PathProcessor) queuePreInitMessages() { +func (pp *PathProcessor) queuePreInitMessages(cancel func()) { if pp.messageLifecycle == nil || pp.sentInitialMsg { return } @@ -582,6 +637,7 @@ func (pp *PathProcessor) queuePreInitMessages() { zap.Inline(channelKey), zap.Error(err), ) + cancel() return } if !pp.IsRelayedChannel(m.Initial.ChainID, channelKey) { @@ -593,6 +649,7 @@ func (pp *PathProcessor) queuePreInitMessages() { "Failed to queue initial connection message, event type not handled", zap.String("event_type", m.Initial.EventType), ) + cancel() return } if m.Initial.ChainID == pp.pathEnd1.info.ChainID { @@ -622,6 +679,7 @@ func (pp *PathProcessor) queuePreInitMessages() { "Failed to queue initial connection message, event type not handled", zap.String("event_type", m.Initial.EventType), ) + cancel() return } connKey := ConnectionInfoConnectionKey(m.Initial.Info) @@ -652,6 +710,7 @@ func (pp *PathProcessor) queuePreInitMessages() { "Failed to queue initial channel message, event type not handled", zap.String("event_type", m.Initial.EventType), ) + cancel() return } chanKey := ChannelInfoChannelKey(m.Initial.Info) @@ -660,7 +719,6 @@ func (pp *PathProcessor) queuePreInitMessages() { if !ok { pp.pathEnd1.messageCache.ChannelHandshake[eventType] = make(ChannelMessageCache) } - pp.pathEnd1.messageCache.ChannelHandshake[eventType][chanKey] = m.Initial.Info } else if m.Initial.ChainID == pp.pathEnd2.info.ChainID { _, ok = pp.pathEnd2.messageCache.ChannelHandshake[eventType] @@ -669,18 +727,81 @@ func (pp *PathProcessor) queuePreInitMessages() { } pp.pathEnd2.messageCache.ChannelHandshake[eventType][chanKey] = m.Initial.Info } + case *ChannelCloseLifecycle: + pp.sentInitialMsg = true + + if !pp.IsRelevantConnection(pp.pathEnd1.info.ChainID, m.SrcConnID) { + return + } + + for k, open := range pp.pathEnd1.channelStateCache { + if k.ChannelID == m.SrcChannelID && k.PortID == m.SrcPortID && k.CounterpartyChannelID != "" && k.CounterpartyPortID != "" { + if open { + // channel is still open on pathEnd1 + break + } + if counterpartyOpen, ok := pp.pathEnd2.channelStateCache[k.Counterparty()]; ok && !counterpartyOpen { + pp.log.Info("Channel already closed on both sides") + cancel() + return + } + // queue channel close init on pathEnd1 + if _, ok := pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit]; !ok { + pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit] = make(ChannelMessageCache) + } + pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit][k] = provider.ChannelInfo{ + PortID: k.PortID, + ChannelID: k.ChannelID, + CounterpartyPortID: k.CounterpartyPortID, + CounterpartyChannelID: k.CounterpartyChannelID, + ConnID: m.SrcConnID, + } + return + } + } + + for k, open := range pp.pathEnd2.channelStateCache { + if k.CounterpartyChannelID == m.SrcChannelID && k.CounterpartyPortID == m.SrcPortID && k.ChannelID != "" && k.PortID != "" { + if open { + // channel is still open on pathEnd2 + break + } + if counterpartyChanState, ok := pp.pathEnd1.channelStateCache[k.Counterparty()]; ok && !counterpartyChanState { + pp.log.Info("Channel already closed on both sides") + cancel() + return + } + // queue channel close init on pathEnd2 + if _, ok := pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit]; !ok { + pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit] = make(ChannelMessageCache) + } + pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit][k] = provider.ChannelInfo{ + PortID: k.PortID, + ChannelID: k.ChannelID, + CounterpartyPortID: k.CounterpartyPortID, + CounterpartyChannelID: k.CounterpartyChannelID, + ConnID: m.DstConnID, + } + } + } + + pp.log.Error("This channel is unable to be closed. Channel must already be closed on one chain.", + zap.String("src_channel_id", m.SrcChannelID), + zap.String("src_port_id", m.SrcPortID), + ) + cancel() } } // messages from both pathEnds are needed in order to determine what needs to be relayed for a single pathEnd -func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { +func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func()) error { // Update trusted client state for both pathends pp.updateClientTrustedState(pp.pathEnd1, pp.pathEnd2) pp.updateClientTrustedState(pp.pathEnd2, pp.pathEnd1) channelPairs := pp.channelPairs() - pp.queuePreInitMessages() + pp.queuePreInitMessages(cancel) pathEnd1ConnectionHandshakeMessages := pathEndConnectionHandshakeMessages{ Src: pp.pathEnd1, @@ -729,20 +850,6 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { pathEnd2ProcessRes := make([]pathEndPacketFlowResponse, len(channelPairs)) for i, pair := range channelPairs { - var pathEnd1ChannelCloseConfirm, pathEnd2ChannelCloseConfirm *provider.ChannelInfo - - if pathEnd1ChanCloseConfirmMsgs, ok := pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm]; ok { - if pathEnd1ChannelCloseConfirmMsg, ok := pathEnd1ChanCloseConfirmMsgs[pair.pathEnd1ChannelKey]; ok { - pathEnd1ChannelCloseConfirm = &pathEnd1ChannelCloseConfirmMsg - } - } - - if pathEnd2ChanCloseConfirmMsgs, ok := pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm]; ok { - if pathEnd2ChannelCloseConfirmMsg, ok := pathEnd2ChanCloseConfirmMsgs[pair.pathEnd2ChannelKey]; ok { - pathEnd2ChannelCloseConfirm = &pathEnd2ChannelCloseConfirmMsg - } - } - // Append acks into recv packet info if present pathEnd1DstMsgRecvPacket := pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeRecvPacket] for seq, ackInfo := range pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeWriteAck] { @@ -761,37 +868,55 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { } pathEnd1PacketFlowMessages := pathEndPacketFlowMessages{ - Src: pp.pathEnd1, - Dst: pp.pathEnd2, - ChannelKey: pair.pathEnd1ChannelKey, - SrcPreTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], - SrcMsgTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeSendPacket], - DstMsgRecvPacket: pathEnd1DstMsgRecvPacket, - SrcMsgAcknowledgement: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeAcknowledgePacket], - SrcMsgTimeout: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacket], - SrcMsgTimeoutOnClose: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], - DstMsgChannelCloseConfirm: pathEnd2ChannelCloseConfirm, + Src: pp.pathEnd1, + Dst: pp.pathEnd2, + ChannelKey: pair.pathEnd1ChannelKey, + SrcPreTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], + SrcMsgTransfer: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeSendPacket], + DstMsgRecvPacket: pathEnd1DstMsgRecvPacket, + SrcMsgAcknowledgement: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeAcknowledgePacket], + SrcMsgTimeout: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacket], + SrcMsgTimeoutOnClose: pp.pathEnd1.messageCache.PacketFlow[pair.pathEnd1ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], } pathEnd2PacketFlowMessages := pathEndPacketFlowMessages{ - Src: pp.pathEnd2, - Dst: pp.pathEnd1, - ChannelKey: pair.pathEnd2ChannelKey, - SrcPreTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], - SrcMsgTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeSendPacket], - DstMsgRecvPacket: pathEnd2DstMsgRecvPacket, - SrcMsgAcknowledgement: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeAcknowledgePacket], - SrcMsgTimeout: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacket], - SrcMsgTimeoutOnClose: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], - DstMsgChannelCloseConfirm: pathEnd1ChannelCloseConfirm, + Src: pp.pathEnd2, + Dst: pp.pathEnd1, + ChannelKey: pair.pathEnd2ChannelKey, + SrcPreTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd1ChannelKey][preInitKey], + SrcMsgTransfer: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeSendPacket], + DstMsgRecvPacket: pathEnd2DstMsgRecvPacket, + SrcMsgAcknowledgement: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeAcknowledgePacket], + SrcMsgTimeout: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacket], + SrcMsgTimeoutOnClose: pp.pathEnd2.messageCache.PacketFlow[pair.pathEnd2ChannelKey][chantypes.EventTypeTimeoutPacketOnClose], } pathEnd1ProcessRes[i] = pp.unrelayedPacketFlowMessages(ctx, pathEnd1PacketFlowMessages) pathEnd2ProcessRes[i] = pp.unrelayedPacketFlowMessages(ctx, pathEnd2PacketFlowMessages) } + pathEnd1ChannelCloseMessages := pathEndChannelCloseMessages{ + Src: pp.pathEnd1, + Dst: pp.pathEnd2, + SrcMsgChannelPreInit: pp.pathEnd1.messageCache.ChannelHandshake[preCloseKey], + SrcMsgChannelCloseInit: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], + DstMsgChannelCloseConfirm: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], + } + pathEnd2ChannelCloseMessages := pathEndChannelCloseMessages{ + Src: pp.pathEnd2, + Dst: pp.pathEnd1, + SrcMsgChannelPreInit: pp.pathEnd2.messageCache.ChannelHandshake[preCloseKey], + SrcMsgChannelCloseInit: pp.pathEnd2.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseInit], + DstMsgChannelCloseConfirm: pp.pathEnd1.messageCache.ChannelHandshake[chantypes.EventTypeChannelCloseConfirm], + } + pathEnd1ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd1ChannelCloseMessages) + pathEnd2ChannelCloseRes := pp.unrelayedChannelCloseMessages(pathEnd2ChannelCloseMessages) + // concatenate applicable messages for pathend pathEnd1ConnectionMessages, pathEnd2ConnectionMessages := pp.connectionMessagesToSend(pathEnd1ConnectionHandshakeRes, pathEnd2ConnectionHandshakeRes) - pathEnd1ChannelMessages, pathEnd2ChannelMessages := pp.channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes) + pathEnd1ChannelMessages, pathEnd2ChannelMessages := pp.channelMessagesToSend( + pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes, + pathEnd1ChannelCloseRes, pathEnd2ChannelCloseRes, + ) pathEnd1PacketMessages, pathEnd2PacketMessages, pathEnd1ChanCloseMessages, pathEnd2ChanCloseMessages := pp.packetMessagesToSend(channelPairs, pathEnd1ProcessRes, pathEnd2ProcessRes) pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd1ChanCloseMessages...) @@ -836,21 +961,31 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context) error { return eg.Wait() } -func (pp *PathProcessor) channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes pathEndChannelHandshakeResponse) ([]channelIBCMessage, []channelIBCMessage) { - pathEnd1ChannelSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) - pathEnd1ChannelDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) - pathEnd2ChannelDstLen := len(pathEnd2ChannelHandshakeRes.DstMessages) - pathEnd2ChannelSrcLen := len(pathEnd2ChannelHandshakeRes.SrcMessages) - pathEnd1ChannelMessages := make([]channelIBCMessage, 0, pathEnd1ChannelSrcLen+pathEnd2ChannelDstLen) - pathEnd2ChannelMessages := make([]channelIBCMessage, 0, pathEnd2ChannelSrcLen+pathEnd1ChannelDstLen) +func (pp *PathProcessor) channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes, pathEnd1ChannelCloseRes, pathEnd2ChannelCloseRes pathEndChannelHandshakeResponse) ([]channelIBCMessage, []channelIBCMessage) { + pathEnd1ChannelOpenSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) + pathEnd1ChannelOpenDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) + pathEnd2ChannelOpenDstLen := len(pathEnd2ChannelHandshakeRes.DstMessages) + pathEnd2ChannelOpenSrcLen := len(pathEnd2ChannelHandshakeRes.SrcMessages) + + pathEnd1ChannelCloseSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) + pathEnd1ChannelCloseDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages) + pathEnd2ChannelCloseDstLen := len(pathEnd2ChannelHandshakeRes.DstMessages) + pathEnd2ChannelCloseSrcLen := len(pathEnd2ChannelHandshakeRes.SrcMessages) + + pathEnd1ChannelMessages := make([]channelIBCMessage, 0, pathEnd1ChannelOpenSrcLen+pathEnd2ChannelOpenDstLen+pathEnd1ChannelCloseSrcLen+pathEnd2ChannelCloseDstLen) + pathEnd2ChannelMessages := make([]channelIBCMessage, 0, pathEnd2ChannelOpenSrcLen+pathEnd1ChannelOpenDstLen+pathEnd2ChannelCloseSrcLen+pathEnd1ChannelCloseDstLen) // pathEnd1 channel messages come from pathEnd1 src and pathEnd2 dst pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd2ChannelHandshakeRes.DstMessages...) + pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd2ChannelCloseRes.DstMessages...) pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd1ChannelHandshakeRes.SrcMessages...) + pathEnd1ChannelMessages = append(pathEnd1ChannelMessages, pathEnd1ChannelCloseRes.SrcMessages...) // pathEnd2 channel messages come from pathEnd2 src and pathEnd1 dst pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd1ChannelHandshakeRes.DstMessages...) + pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd1ChannelCloseRes.DstMessages...) pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd2ChannelHandshakeRes.SrcMessages...) + pathEnd2ChannelMessages = append(pathEnd2ChannelMessages, pathEnd2ChannelCloseRes.SrcMessages...) return pathEnd1ChannelMessages, pathEnd2ChannelMessages } @@ -1094,9 +1229,7 @@ func (pp *PathProcessor) flush(ctx context.Context) { // shouldTerminateForFlushComplete will determine if the relayer should exit // when FlushLifecycle is used. It will exit when all of the message caches are cleared. -func (pp *PathProcessor) shouldTerminateForFlushComplete( - ctx context.Context, cancel func(), -) bool { +func (pp *PathProcessor) shouldTerminateForFlushComplete() bool { if _, ok := pp.messageLifecycle.(*FlushLifecycle); !ok { return false } diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 347974800..3f4059b7b 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -72,6 +72,18 @@ type ChannelMessageLifecycle struct { func (t *ChannelMessageLifecycle) messageLifecycler() {} +// ChannelCloseLifecycle is used as a stop condition for the PathProcessor. +// It will attempt to finish closing the channel and terminate once the channel is closed. +type ChannelCloseLifecycle struct { + SrcChainID string + SrcChannelID string + SrcPortID string + SrcConnID string + DstConnID string +} + +func (t *ChannelCloseLifecycle) messageLifecycler() {} + // IBCMessagesCache holds cached messages for packet flows, connection handshakes, // and channel handshakes. The PathProcessors use this for message correlation to determine // when messages should be sent and are pruned when flows/handshakes are complete. diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index 66c56edac..d526ed70a 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -376,16 +376,15 @@ type clientICQProcessingCache map[provider.ClientICQQueryID]processingMessage // contains MsgRecvPacket from counterparty // entire packet flow type pathEndPacketFlowMessages struct { - Src *pathEndRuntime - Dst *pathEndRuntime - ChannelKey ChannelKey - SrcPreTransfer PacketSequenceCache - SrcMsgTransfer PacketSequenceCache - DstMsgRecvPacket PacketSequenceCache - SrcMsgAcknowledgement PacketSequenceCache - SrcMsgTimeout PacketSequenceCache - SrcMsgTimeoutOnClose PacketSequenceCache - DstMsgChannelCloseConfirm *provider.ChannelInfo + Src *pathEndRuntime + Dst *pathEndRuntime + ChannelKey ChannelKey + SrcPreTransfer PacketSequenceCache + SrcMsgTransfer PacketSequenceCache + DstMsgRecvPacket PacketSequenceCache + SrcMsgAcknowledgement PacketSequenceCache + SrcMsgTimeout PacketSequenceCache + SrcMsgTimeoutOnClose PacketSequenceCache } type pathEndConnectionHandshakeMessages struct { @@ -408,6 +407,14 @@ type pathEndChannelHandshakeMessages struct { DstMsgChannelOpenConfirm ChannelMessageCache } +type pathEndChannelCloseMessages struct { + Src *pathEndRuntime + Dst *pathEndRuntime + SrcMsgChannelPreInit ChannelMessageCache + SrcMsgChannelCloseInit ChannelMessageCache + DstMsgChannelCloseConfirm ChannelMessageCache +} + type pathEndPacketFlowResponse struct { SrcMessages []packetIBCMessage DstMessages []packetIBCMessage