From 57860637e3682097833bac44a4cab4bd3a7e52bd Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 6 Oct 2023 16:30:57 -0700 Subject: [PATCH 01/19] Use peermanager scores for blocksync peers --- internal/blocksync/pool.go | 24 ++++++++- internal/blocksync/pool_test.go | 69 ++++++++++++++++++++++---- internal/blocksync/reactor.go | 9 ++-- internal/blocksync/reactor_test.go | 80 +++++++++++++++--------------- node/node.go | 1 + 5 files changed, 130 insertions(+), 53 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 2a315eadd..192e0f507 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "github.com/tendermint/tendermint/internal/p2p" "math" + "sort" "sync" "sync/atomic" "time" @@ -80,6 +82,7 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[types.NodeID]*bpPeer + peerManager *p2p.PeerManager maxPeerHeight int64 // the biggest reported height // atomic @@ -101,8 +104,8 @@ func NewBlockPool( start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError, + peerManager *p2p.PeerManager, ) *BlockPool { - bp := &BlockPool{ logger: logger, peers: make(map[types.NodeID]*bpPeer), @@ -113,6 +116,7 @@ func NewBlockPool( requestsCh: requestsCh, errorsCh: errorsCh, lastSyncRate: 0, + peerManager: peerManager, } bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp) return bp @@ -408,13 +412,29 @@ func (pool *BlockPool) updateMaxPeerHeight() { pool.maxPeerHeight = max } +func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.NodeID { + // Generate a sorted list + sortedPeers := make([]types.NodeID, 0, len(peers)) + for peer := range peers { + sortedPeers = append(sortedPeers, peer) + } + sort.Slice(sortedPeers, func(i, j int) bool { + return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j]) + }) + return sortedPeers +} + // Pick an available peer with the given height available. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() - for _, peer := range pool.peers { + // Generate a sorted list + sortedPeers := pool.getSortedPeers(pool.peers) + for _, nodeId := range sortedPeers { + peer := pool.peers[nodeId] + pool.peerManager.Score(peer.id) if peer.didTimeout { pool.removePeer(peer.id) continue diff --git a/internal/blocksync/pool_test.go b/internal/blocksync/pool_test.go index 3c47b4a64..fab63e5a1 100644 --- a/internal/blocksync/pool_test.go +++ b/internal/blocksync/pool_test.go @@ -2,16 +2,20 @@ package blocksync import ( "context" + "crypto/rand" + "encoding/hex" "fmt" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/crypto/ed25519" + "github.com/tendermint/tendermint/internal/p2p" + dbm "github.com/tendermint/tm-db" mrand "math/rand" + "strings" "testing" "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/libs/log" - tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/types" ) @@ -24,6 +28,7 @@ type testPeer struct { base int64 height int64 inputChan chan inputData // make sure each peer's data is sequential + score p2p.PeerScore } type inputData struct { @@ -70,17 +75,42 @@ func (ps testPeers) stop() { func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { peers := make(testPeers, numPeers) for i := 0; i < numPeers; i++ { - peerID := types.NodeID(tmrand.Str(12)) + bytes := make([]byte, 20) + if _, err := rand.Read(bytes); err != nil { + panic(err) + } + peerID := types.NodeID(hex.EncodeToString(bytes)) height := minHeight + mrand.Int63n(maxHeight-minHeight) base := minHeight + int64(i) if base > height { base = height } - peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)} + peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10), 1} } return peers } +func makePeerManager(peers map[types.NodeID]testPeer) *p2p.PeerManager { + selfKey := ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd}) + selfID := types.NodeIDFromPubKey(selfKey.PubKey()) + peerScores := make(map[types.NodeID]p2p.PeerScore) + for nodeId, peer := range peers { + peerScores[nodeId] = peer.score + + } + peerManager, _ := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{ + PeerScores: peerScores, + MaxConnected: 1, + MaxConnectedUpgrade: 2, + }, p2p.NopMetrics()) + for nodeId, _ := range peers { + _, err := peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: nodeId}) + if err != nil { + panic(err) + } + } + return peerManager +} func TestBlockPoolBasic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -89,7 +119,7 @@ func TestBlockPoolBasic(t *testing.T) { peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh) + pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh, makePeerManager(peers)) if err := pool.Start(ctx); err != nil { t.Error(err) @@ -147,7 +177,7 @@ func TestBlockPoolTimeout(t *testing.T) { peers := makePeers(10, start+1, 1000) errorsCh := make(chan peerError, 1000) requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(logger, start, requestsCh, errorsCh) + pool := NewBlockPool(logger, start, requestsCh, errorsCh, makePeerManager(peers)) err := pool.Start(ctx) if err != nil { t.Error(err) @@ -205,12 +235,12 @@ func TestBlockPoolRemovePeer(t *testing.T) { for i := 0; i < 10; i++ { peerID := types.NodeID(fmt.Sprintf("%d", i+1)) height := int64(i + 1) - peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)} + peers[peerID] = testPeer{peerID, 0, height, make(chan inputData), 1} } requestsCh := make(chan BlockRequest) errorsCh := make(chan peerError) - pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh) + pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, nil) err := pool.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); pool.Wait() }) @@ -235,3 +265,24 @@ func TestBlockPoolRemovePeer(t *testing.T) { assert.EqualValues(t, 0, pool.MaxPeerHeight()) } + +func TestSortedPeers(t *testing.T) { + peers := make(testPeers, 10) + peerIdA := types.NodeID(strings.Repeat("a", 40)) + peerIdB := types.NodeID(strings.Repeat("b", 40)) + peerIdC := types.NodeID(strings.Repeat("c", 40)) + + peers[peerIdA] = testPeer{peerIdA, 0, 1, make(chan inputData), 11} + peers[peerIdB] = testPeer{peerIdA, 0, 1, make(chan inputData), 10} + peers[peerIdC] = testPeer{peerIdA, 0, 1, make(chan inputData), 13} + + requestsCh := make(chan BlockRequest) + errorsCh := make(chan peerError) + pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers)) + // add peers + for peerID, peer := range peers { + pool.SetPeerRange(peerID, peer.base, peer.height) + } + // Peers should be sorted by score via peerManager + assert.Equal(t, []types.NodeID{peerIdC, peerIdA, peerIdB}, pool.getSortedPeers(pool.peers)) +} diff --git a/internal/blocksync/reactor.go b/internal/blocksync/reactor.go index 9935f7f33..caa3d213b 100644 --- a/internal/blocksync/reactor.go +++ b/internal/blocksync/reactor.go @@ -81,8 +81,9 @@ type Reactor struct { consReactor consensusReactor blockSync *atomicBool - peerEvents p2p.PeerEventSubscriber - channel *p2p.Channel + peerEvents p2p.PeerEventSubscriber + peerManager *p2p.PeerManager + channel *p2p.Channel requestsCh <-chan BlockRequest errorsCh <-chan peerError @@ -105,6 +106,7 @@ func NewReactor( store *store.BlockStore, consReactor consensusReactor, peerEvents p2p.PeerEventSubscriber, + peerManager *p2p.PeerManager, blockSync bool, metrics *consensus.Metrics, eventBus *eventbus.EventBus, @@ -119,6 +121,7 @@ func NewReactor( consReactor: consReactor, blockSync: newAtomicBool(blockSync), peerEvents: peerEvents, + peerManager: peerManager, metrics: metrics, eventBus: eventBus, restartCh: restartCh, @@ -159,7 +162,7 @@ func (r *Reactor) OnStart(ctx context.Context) error { requestsCh := make(chan BlockRequest, maxTotalRequesters) errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count. - r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh) + r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh, r.peerManager) r.requestsCh = requestsCh r.errorsCh = errorsCh diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 09c6fedbe..0dfd94b80 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -106,6 +106,7 @@ func makeReactor( privVal types.PrivValidator, channelCreator p2p.ChannelCreator, peerEvents p2p.PeerEventSubscriber, + peerManager *p2p.PeerManager, restartChan chan struct{}, selfRemediationConfig *config.SelfRemediationConfig, ) *Reactor { @@ -158,6 +159,7 @@ func makeReactor( blockStore, nil, peerEvents, + peerManager, true, consensus.NopMetrics(), nil, // eventbus, can be nil @@ -203,6 +205,7 @@ func (rts *reactorTestSuite) addNode( privVal, chCreator, peerEvents, + rts.network.Nodes[nodeID].PeerManager, restartChan, config.DefaultSelfRemediationConfig(), ) @@ -354,49 +357,49 @@ func (m *MockBlockStore) Height() int64 { func TestAutoRestartIfBehind(t *testing.T) { t.Parallel() tests := []struct { - name string - blocksBehindThreshold uint64 + name string + blocksBehindThreshold uint64 blocksBehindCheckInterval time.Duration - selfHeight int64 - maxPeerHeight int64 - isBlockSync bool - restartExpected bool + selfHeight int64 + maxPeerHeight int64 + isBlockSync bool + restartExpected bool }{ { - name: "Should not restart if blocksBehindThreshold is 0", - blocksBehindThreshold: 0, + name: "Should not restart if blocksBehindThreshold is 0", + blocksBehindThreshold: 0, blocksBehindCheckInterval: 10 * time.Millisecond, - selfHeight: 100, - maxPeerHeight: 200, - isBlockSync: false, - restartExpected: false, + selfHeight: 100, + maxPeerHeight: 200, + isBlockSync: false, + restartExpected: false, }, { - name: "Should not restart if behindHeight is less than threshold", - blocksBehindThreshold: 50, - selfHeight: 100, + name: "Should not restart if behindHeight is less than threshold", + blocksBehindThreshold: 50, + selfHeight: 100, blocksBehindCheckInterval: 10 * time.Millisecond, - maxPeerHeight: 140, - isBlockSync: false, - restartExpected: false, + maxPeerHeight: 140, + isBlockSync: false, + restartExpected: false, }, { - name: "Should restart if behindHeight is greater than or equal to threshold", - blocksBehindThreshold: 50, - selfHeight: 100, + name: "Should restart if behindHeight is greater than or equal to threshold", + blocksBehindThreshold: 50, + selfHeight: 100, blocksBehindCheckInterval: 10 * time.Millisecond, - maxPeerHeight: 160, - isBlockSync: false, - restartExpected: true, + maxPeerHeight: 160, + isBlockSync: false, + restartExpected: true, }, { - name: "Should not restart if blocksync", - blocksBehindThreshold: 50, - selfHeight: 100, + name: "Should not restart if blocksync", + blocksBehindThreshold: 50, + selfHeight: 100, blocksBehindCheckInterval: 10 * time.Millisecond, - maxPeerHeight: 160, - isBlockSync: true, - restartExpected: false, + maxPeerHeight: 160, + isBlockSync: true, + restartExpected: false, }, } @@ -407,21 +410,20 @@ func TestAutoRestartIfBehind(t *testing.T) { mockBlockStore.On("Height").Return(tt.selfHeight) blockPool := &BlockPool{ - logger: log.TestingLogger(), - height: tt.selfHeight, + logger: log.TestingLogger(), + height: tt.selfHeight, maxPeerHeight: tt.maxPeerHeight, - } restartChan := make(chan struct{}, 1) r := &Reactor{ - logger: log.TestingLogger(), - store: mockBlockStore, - pool: blockPool, - blocksBehindThreshold: tt.blocksBehindThreshold, + logger: log.TestingLogger(), + store: mockBlockStore, + pool: blockPool, + blocksBehindThreshold: tt.blocksBehindThreshold, blocksBehindCheckInterval: tt.blocksBehindCheckInterval, - restartCh: restartChan, - blockSync: newAtomicBool(tt.isBlockSync), + restartCh: restartChan, + blockSync: newAtomicBool(tt.isBlockSync), } ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) diff --git a/node/node.go b/node/node.go index 2b2a0c9e8..0cd8a62bf 100644 --- a/node/node.go +++ b/node/node.go @@ -367,6 +367,7 @@ func makeNode( blockStore, csReactor, peerManager.Subscribe, + peerManager, blockSync && !stateSync && !shoulddbsync, nodeMetrics.consensus, eventBus, From 1ac69bc824ff219a2ac8d04ba218cb31fe08ea94 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 9 Oct 2023 14:25:33 -0700 Subject: [PATCH 02/19] Add debug --- internal/blocksync/pool.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 192e0f507..00fedb91e 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -415,6 +415,7 @@ func (pool *BlockPool) updateMaxPeerHeight() { func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.NodeID { // Generate a sorted list sortedPeers := make([]types.NodeID, 0, len(peers)) + for peer := range peers { sortedPeers = append(sortedPeers, peer) } @@ -432,6 +433,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { // Generate a sorted list sortedPeers := pool.getSortedPeers(pool.peers) + fmt.Printf("PSUDEBUG - block sync with sorted peers: %v\n", sortedPeers) for _, nodeId := range sortedPeers { peer := pool.peers[nodeId] pool.peerManager.Score(peer.id) From fe9f6e054ad31d665017a5c18e29793a38131b12 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 16 Oct 2023 14:30:43 -0700 Subject: [PATCH 03/19] Randomize --- internal/blocksync/pool.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 00fedb91e..ff9301e7e 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/tendermint/tendermint/internal/p2p" "math" + "math/rand" "sort" "sync" "sync/atomic" @@ -419,6 +420,7 @@ func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.No for peer := range peers { sortedPeers = append(sortedPeers, peer) } + // Sort from high to low score sort.Slice(sortedPeers, func(i, j int) bool { return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j]) }) @@ -433,10 +435,19 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { // Generate a sorted list sortedPeers := pool.getSortedPeers(pool.peers) - fmt.Printf("PSUDEBUG - block sync with sorted peers: %v\n", sortedPeers) + var goodPeers []types.NodeID + // Remove peers with 0 score and shuffle list + for _, peer := range sortedPeers { + if pool.peerManager.Score(peer) == 0 { + break + } + goodPeers = append(goodPeers, peer) + } + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] }) + for _, nodeId := range sortedPeers { peer := pool.peers[nodeId] - pool.peerManager.Score(peer.id) if peer.didTimeout { pool.removePeer(peer.id) continue From 08f81636f2dfa6752e080d38725aac79f2186661 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 16 Oct 2023 14:58:15 -0700 Subject: [PATCH 04/19] debug --- internal/blocksync/pool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index ff9301e7e..70c1d8699 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -641,6 +641,7 @@ func (*bpRequester) OnStop() {} func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID types.NodeID) bool { bpr.mtx.Lock() if bpr.block != nil || bpr.peerID != peerID { + fmt.Printf("\nPSUDEBUG no match, existing block %v, want to set block %v peerId doesn't equal bprPeer %v, peer %v", bpr.block, block, bpr.peerID, peerID) bpr.mtx.Unlock() return false } From 2f4517f24025f2d60c6b1c3baa9adb01023dd078 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 19 Oct 2023 14:02:57 -0700 Subject: [PATCH 05/19] use state to filter --- internal/blocksync/pool.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 70c1d8699..b2d43b60d 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -438,10 +438,12 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { var goodPeers []types.NodeID // Remove peers with 0 score and shuffle list for _, peer := range sortedPeers { + if pool.peerManager.State(peer) == "ready,connected" { + goodPeers = append(goodPeers, peer) + } if pool.peerManager.Score(peer) == 0 { break } - goodPeers = append(goodPeers, peer) } rand.Seed(time.Now().UnixNano()) rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] }) @@ -641,7 +643,6 @@ func (*bpRequester) OnStop() {} func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCommit, peerID types.NodeID) bool { bpr.mtx.Lock() if bpr.block != nil || bpr.peerID != peerID { - fmt.Printf("\nPSUDEBUG no match, existing block %v, want to set block %v peerId doesn't equal bprPeer %v, peer %v", bpr.block, block, bpr.peerID, peerID) bpr.mtx.Unlock() return false } From bfc637c0add1a854b6e84c0c281173ac3dd78dad Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 19 Oct 2023 14:20:31 -0700 Subject: [PATCH 06/19] debug --- internal/blocksync/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index b2d43b60d..c070bbd41 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -33,7 +33,7 @@ eg, L = latency = 0.1s const ( requestInterval = 2 * time.Millisecond inactiveSleepInterval = 1 * time.Second - maxTotalRequesters = 600 + maxTotalRequesters = 50 maxPeerErrBuffer = 1000 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 From 533f759d8b60e71c3c93bf2e42b9d02c807c5468 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 19 Oct 2023 14:34:14 -0700 Subject: [PATCH 07/19] debug --- internal/blocksync/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index c070bbd41..b2d43b60d 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -33,7 +33,7 @@ eg, L = latency = 0.1s const ( requestInterval = 2 * time.Millisecond inactiveSleepInterval = 1 * time.Second - maxTotalRequesters = 50 + maxTotalRequesters = 600 maxPeerErrBuffer = 1000 maxPendingRequests = maxTotalRequesters maxPendingRequestsPerPeer = 20 From 8fa86386b1ff665acd6622c81b3f8872b16dc936 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 19 Oct 2023 14:44:11 -0700 Subject: [PATCH 08/19] debug --- internal/blocksync/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index b2d43b60d..bee8c5533 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -320,7 +320,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm } } else { err := errors.New("requester is different or block already exists") - pool.sendError(err, peerID) + //pool.sendError(err, peerID) return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) } From 1137ac4fb5377baa57c8a01d0ad914d5ab472b78 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Thu, 19 Oct 2023 15:06:07 -0700 Subject: [PATCH 09/19] debug --- internal/blocksync/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index bee8c5533..af9c870de 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -50,7 +50,7 @@ const ( maxDiffBetweenCurrentAndReceivedBlockHeight = 100 ) -var peerTimeout = 15 * time.Second // not const so we can override with tests +var peerTimeout = 3 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -320,7 +320,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm } } else { err := errors.New("requester is different or block already exists") - //pool.sendError(err, peerID) + pool.sendError(err, peerID) return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) } From 2bc2fbbb74f186863acbc115302aafddd52741e3 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 23 Oct 2023 09:41:35 -0700 Subject: [PATCH 10/19] add comments --- internal/blocksync/pool.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index af9c870de..537f3a0be 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -320,7 +320,9 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm } } else { err := errors.New("requester is different or block already exists") - pool.sendError(err, peerID) + // Original behavior is to error out when there is a mismatch, which shuts down the entire reactor. + // Instead, make the reactor more robust and just log error + //pool.sendError(err, peerID) return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) } @@ -438,6 +440,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { var goodPeers []types.NodeID // Remove peers with 0 score and shuffle list for _, peer := range sortedPeers { + // We only want to work with peers that are ready & connected (not dialing) if pool.peerManager.State(peer) == "ready,connected" { goodPeers = append(goodPeers, peer) } From a1bd25908c7d10ff7112e033021209f1bfe04b38 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Tue, 24 Oct 2023 14:21:11 -0700 Subject: [PATCH 11/19] don't err --- internal/blocksync/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 537f3a0be..7185343c0 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -322,7 +322,7 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm err := errors.New("requester is different or block already exists") // Original behavior is to error out when there is a mismatch, which shuts down the entire reactor. // Instead, make the reactor more robust and just log error - //pool.sendError(err, peerID) + pool.sendError(err, peerID) return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) } From a83139b55242d7f1275eb44020e51b45cb29a88c Mon Sep 17 00:00:00 2001 From: Philip Su Date: Fri, 27 Oct 2023 15:30:33 -0700 Subject: [PATCH 12/19] revert timeout --- internal/blocksync/pool.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 7185343c0..3f9a970a3 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -50,7 +50,7 @@ const ( maxDiffBetweenCurrentAndReceivedBlockHeight = 100 ) -var peerTimeout = 3 * time.Second // not const so we can override with tests +var peerTimeout = 15 * time.Second // not const so we can override with tests /* Peers self report their heights when we join the block pool. @@ -320,8 +320,6 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm } } else { err := errors.New("requester is different or block already exists") - // Original behavior is to error out when there is a mismatch, which shuts down the entire reactor. - // Instead, make the reactor more robust and just log error pool.sendError(err, peerID) return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height) } From ae4b8bc244419afced88c752f6d972fe6e374ebe Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 30 Oct 2023 08:21:54 -0700 Subject: [PATCH 13/19] Add missing param --- test/e2e/node/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/e2e/node/main.go b/test/e2e/node/main.go index b1adcd109..85587dbf8 100644 --- a/test/e2e/node/main.go +++ b/test/e2e/node/main.go @@ -203,6 +203,7 @@ func startLightNode(ctx context.Context, logger log.Logger, cfg *Config) error { providers[0], providers[1:], dbs.New(lightDB), + 5*time.Minute, light.Logger(nodeLogger), ) if err != nil { From 3f0649372cc8f7fdaad5677889e80a2a9c5a62a6 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 30 Oct 2023 15:39:56 -0700 Subject: [PATCH 14/19] Remove flaky test --- internal/consensus/state_test.go | 71 ++++++++++++++++---------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 3dbdf8453..dd8a37a8f 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -2452,44 +2452,45 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) { ensureNewRound(t, newRoundCh, height, round+1) } +// TODO (psu): This test seems to be flaky, disable for now // 4 vals, 3 Prevotes for nil from the higher round. // What we want: // P0 waits for timeoutPropose in the next round before entering prevote -func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { - config := configSetup(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) - vs2, vs3, vs4 := vss[1], vss[2], vss[3] - height, round := cs1.roundState.Height(), cs1.roundState.Round() - - timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) - newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) - pv1, err := cs1.privValidator.GetPubKey(ctx) - require.NoError(t, err) - addr := pv1.Address() - voteCh := subscribeToVoter(ctx, t, cs1, addr) - - // start round - startTestRound(ctx, cs1, height, round) - ensureNewRound(t, newRoundCh, height, round) - - ensurePrevote(t, voteCh, height, round) - - incrementRound(vss[1:]...) - signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) - - round++ // moving to the next round - ensureNewRound(t, newRoundCh, height, round) - - rs := cs1.GetRoundState() - assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires - - ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) - - ensurePrevoteMatch(t, voteCh, height, round, nil) -} +//func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { +// config := configSetup(t) +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() +// +// cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) +// vs2, vs3, vs4 := vss[1], vss[2], vss[3] +// height, round := cs1.roundState.Height(), cs1.roundState.Round() +// +// timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) +// newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) +// pv1, err := cs1.privValidator.GetPubKey(ctx) +// require.NoError(t, err) +// addr := pv1.Address() +// voteCh := subscribeToVoter(ctx, t, cs1, addr) +// +// // start round +// startTestRound(ctx, cs1, height, round) +// ensureNewRound(t, newRoundCh, height, round) +// +// ensurePrevote(t, voteCh, height, round) +// +// incrementRound(vss[1:]...) +// signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) +// +// round++ // moving to the next round +// ensureNewRound(t, newRoundCh, height, round) +// +// rs := cs1.GetRoundState() +// assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires +// +// ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) +// +// ensurePrevoteMatch(t, voteCh, height, round, nil) +//} // 4 vals, 3 Precommits for nil from the higher round. // What we want: From a70ffd241061e5b102cc54689b0c63ae61c96b76 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Mon, 30 Oct 2023 17:18:26 -0700 Subject: [PATCH 15/19] fix nil --- internal/blocksync/pool_test.go | 11 +++-- internal/consensus/state_test.go | 71 ++++++++++++++++---------------- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/internal/blocksync/pool_test.go b/internal/blocksync/pool_test.go index fab63e5a1..27caad060 100644 --- a/internal/blocksync/pool_test.go +++ b/internal/blocksync/pool_test.go @@ -233,14 +233,19 @@ func TestBlockPoolRemovePeer(t *testing.T) { peers := make(testPeers, 10) for i := 0; i < 10; i++ { - peerID := types.NodeID(fmt.Sprintf("%d", i+1)) + var peerID types.NodeID + if i+1 == 10 { + peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 20)) + } else { + peerID = types.NodeID(strings.Repeat(fmt.Sprintf("%d", i+1), 40)) + } height := int64(i + 1) peers[peerID] = testPeer{peerID, 0, height, make(chan inputData), 1} } requestsCh := make(chan BlockRequest) errorsCh := make(chan peerError) - pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, nil) + pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers)) err := pool.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); pool.Wait() }) @@ -255,7 +260,7 @@ func TestBlockPoolRemovePeer(t *testing.T) { assert.NotPanics(t, func() { pool.RemovePeer(types.NodeID("Superman")) }) // remove peer with biggest height - pool.RemovePeer(types.NodeID("10")) + pool.RemovePeer(types.NodeID(strings.Repeat("10", 20))) assert.EqualValues(t, 9, pool.MaxPeerHeight()) // remove all peers diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index dd8a37a8f..3dbdf8453 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -2452,45 +2452,44 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) { ensureNewRound(t, newRoundCh, height, round+1) } -// TODO (psu): This test seems to be flaky, disable for now // 4 vals, 3 Prevotes for nil from the higher round. // What we want: // P0 waits for timeoutPropose in the next round before entering prevote -//func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { -// config := configSetup(t) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) -// vs2, vs3, vs4 := vss[1], vss[2], vss[3] -// height, round := cs1.roundState.Height(), cs1.roundState.Round() -// -// timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) -// newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) -// pv1, err := cs1.privValidator.GetPubKey(ctx) -// require.NoError(t, err) -// addr := pv1.Address() -// voteCh := subscribeToVoter(ctx, t, cs1, addr) -// -// // start round -// startTestRound(ctx, cs1, height, round) -// ensureNewRound(t, newRoundCh, height, round) -// -// ensurePrevote(t, voteCh, height, round) -// -// incrementRound(vss[1:]...) -// signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) -// -// round++ // moving to the next round -// ensureNewRound(t, newRoundCh, height, round) -// -// rs := cs1.GetRoundState() -// assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires -// -// ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) -// -// ensurePrevoteMatch(t, voteCh, height, round, nil) -//} +func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { + config := configSetup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) + vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.roundState.Height(), cs1.roundState.Round() + + timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) + newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) + pv1, err := cs1.privValidator.GetPubKey(ctx) + require.NoError(t, err) + addr := pv1.Address() + voteCh := subscribeToVoter(ctx, t, cs1, addr) + + // start round + startTestRound(ctx, cs1, height, round) + ensureNewRound(t, newRoundCh, height, round) + + ensurePrevote(t, voteCh, height, round) + + incrementRound(vss[1:]...) + signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) + + round++ // moving to the next round + ensureNewRound(t, newRoundCh, height, round) + + rs := cs1.GetRoundState() + assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires + + ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) + + ensurePrevoteMatch(t, voteCh, height, round, nil) +} // 4 vals, 3 Precommits for nil from the higher round. // What we want: From e47270c3157a0daa414e364c1f6d961439711f62 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Tue, 31 Oct 2023 08:05:59 -0700 Subject: [PATCH 16/19] debug --- internal/consensus/state_test.go | 70 ++++++++++++++++---------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 3dbdf8453..40ccf6b84 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -2455,41 +2455,41 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) { // 4 vals, 3 Prevotes for nil from the higher round. // What we want: // P0 waits for timeoutPropose in the next round before entering prevote -func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { - config := configSetup(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) - vs2, vs3, vs4 := vss[1], vss[2], vss[3] - height, round := cs1.roundState.Height(), cs1.roundState.Round() - - timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) - newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) - pv1, err := cs1.privValidator.GetPubKey(ctx) - require.NoError(t, err) - addr := pv1.Address() - voteCh := subscribeToVoter(ctx, t, cs1, addr) - - // start round - startTestRound(ctx, cs1, height, round) - ensureNewRound(t, newRoundCh, height, round) - - ensurePrevote(t, voteCh, height, round) - - incrementRound(vss[1:]...) - signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) - - round++ // moving to the next round - ensureNewRound(t, newRoundCh, height, round) - - rs := cs1.GetRoundState() - assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires - - ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) - - ensurePrevoteMatch(t, voteCh, height, round, nil) -} +//func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { +// config := configSetup(t) +// ctx, cancel := context.WithCancel(context.Background()) +// defer cancel() +// +// cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) +// vs2, vs3, vs4 := vss[1], vss[2], vss[3] +// height, round := cs1.roundState.Height(), cs1.roundState.Round() +// +// timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) +// newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) +// pv1, err := cs1.privValidator.GetPubKey(ctx) +// require.NoError(t, err) +// addr := pv1.Address() +// voteCh := subscribeToVoter(ctx, t, cs1, addr) +// +// // start round +// startTestRound(ctx, cs1, height, round) +// ensureNewRound(t, newRoundCh, height, round) +// +// ensurePrevote(t, voteCh, height, round) +// +// incrementRound(vss[1:]...) +// signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) +// +// round++ // moving to the next round +// ensureNewRound(t, newRoundCh, height, round) +// +// rs := cs1.GetRoundState() +// assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires +// +// ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) +// +// ensurePrevoteMatch(t, voteCh, height, round, nil) +//} // 4 vals, 3 Precommits for nil from the higher round. // What we want: From 36d31f90d63a69a20ab7fadc3ba559da89cf6a04 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Tue, 31 Oct 2023 09:58:22 -0700 Subject: [PATCH 17/19] debug --- internal/blocksync/pool.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 3f9a970a3..49c8cf6b0 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -430,11 +430,11 @@ func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.No // Pick an available peer with the given height available. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { - pool.mtx.Lock() - defer pool.mtx.Unlock() // Generate a sorted list + pool.mtx.Lock() sortedPeers := pool.getSortedPeers(pool.peers) + pool.mtx.Unlock() var goodPeers []types.NodeID // Remove peers with 0 score and shuffle list for _, peer := range sortedPeers { @@ -449,6 +449,8 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { rand.Seed(time.Now().UnixNano()) rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] }) + pool.mtx.Lock() + defer pool.mtx.Unlock() for _, nodeId := range sortedPeers { peer := pool.peers[nodeId] if peer.didTimeout { From ecf6e7f0178f439c4ef304e907cf0657eb9412f4 Mon Sep 17 00:00:00 2001 From: Philip Su Date: Tue, 31 Oct 2023 14:24:15 -0700 Subject: [PATCH 18/19] debug --- internal/blocksync/pool.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/blocksync/pool.go b/internal/blocksync/pool.go index 49c8cf6b0..3f9a970a3 100644 --- a/internal/blocksync/pool.go +++ b/internal/blocksync/pool.go @@ -430,11 +430,11 @@ func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.No // Pick an available peer with the given height available. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { + pool.mtx.Lock() + defer pool.mtx.Unlock() // Generate a sorted list - pool.mtx.Lock() sortedPeers := pool.getSortedPeers(pool.peers) - pool.mtx.Unlock() var goodPeers []types.NodeID // Remove peers with 0 score and shuffle list for _, peer := range sortedPeers { @@ -449,8 +449,6 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { rand.Seed(time.Now().UnixNano()) rand.Shuffle(len(goodPeers), func(i, j int) { goodPeers[i], goodPeers[j] = goodPeers[j], goodPeers[i] }) - pool.mtx.Lock() - defer pool.mtx.Unlock() for _, nodeId := range sortedPeers { peer := pool.peers[nodeId] if peer.didTimeout { From a1493862227afeea930fb4f32e3dcf42b1b7f1ca Mon Sep 17 00:00:00 2001 From: Philip Su Date: Tue, 31 Oct 2023 14:25:54 -0700 Subject: [PATCH 19/19] debug --- internal/consensus/state_test.go | 70 ++++++++++++++++---------------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/internal/consensus/state_test.go b/internal/consensus/state_test.go index 40ccf6b84..3dbdf8453 100644 --- a/internal/consensus/state_test.go +++ b/internal/consensus/state_test.go @@ -2455,41 +2455,41 @@ func TestWaitingTimeoutOnNilPolka(t *testing.T) { // 4 vals, 3 Prevotes for nil from the higher round. // What we want: // P0 waits for timeoutPropose in the next round before entering prevote -//func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { -// config := configSetup(t) -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) -// vs2, vs3, vs4 := vss[1], vss[2], vss[3] -// height, round := cs1.roundState.Height(), cs1.roundState.Round() -// -// timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) -// newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) -// pv1, err := cs1.privValidator.GetPubKey(ctx) -// require.NoError(t, err) -// addr := pv1.Address() -// voteCh := subscribeToVoter(ctx, t, cs1, addr) -// -// // start round -// startTestRound(ctx, cs1, height, round) -// ensureNewRound(t, newRoundCh, height, round) -// -// ensurePrevote(t, voteCh, height, round) -// -// incrementRound(vss[1:]...) -// signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) -// -// round++ // moving to the next round -// ensureNewRound(t, newRoundCh, height, round) -// -// rs := cs1.GetRoundState() -// assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires -// -// ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) -// -// ensurePrevoteMatch(t, voteCh, height, round, nil) -//} +func TestWaitingTimeoutProposeOnNewRound(t *testing.T) { + config := configSetup(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cs1, vss := makeState(ctx, t, makeStateArgs{config: config}) + vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.roundState.Height(), cs1.roundState.Round() + + timeoutWaitCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryTimeoutPropose) + newRoundCh := subscribe(ctx, t, cs1.eventBus, types.EventQueryNewRound) + pv1, err := cs1.privValidator.GetPubKey(ctx) + require.NoError(t, err) + addr := pv1.Address() + voteCh := subscribeToVoter(ctx, t, cs1, addr) + + // start round + startTestRound(ctx, cs1, height, round) + ensureNewRound(t, newRoundCh, height, round) + + ensurePrevote(t, voteCh, height, round) + + incrementRound(vss[1:]...) + signAddVotes(ctx, t, cs1, tmproto.PrevoteType, config.ChainID(), types.BlockID{}, vs2, vs3, vs4) + + round++ // moving to the next round + ensureNewRound(t, newRoundCh, height, round) + + rs := cs1.GetRoundState() + assert.Equal(t, rs.Step, cstypes.RoundStepPropose) // P0 does not prevote before timeoutPropose expires + + ensureNewTimeout(t, timeoutWaitCh, height, round, cs1.proposeTimeout(round).Milliseconds()) + + ensurePrevoteMatch(t, voteCh, height, round, nil) +} // 4 vals, 3 Precommits for nil from the higher round. // What we want: