Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use peermanager scores for blocksync peers and don't error out on block mismatch #162

Merged
merged 21 commits into from
Oct 31, 2023
Merged
26 changes: 24 additions & 2 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/internal/p2p"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: fix lint issue

"math"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -80,6 +82,7 @@ type BlockPool struct {
height int64 // the lowest key in requesters.
// peers
peers map[types.NodeID]*bpPeer
peerManager *p2p.PeerManager
maxPeerHeight int64 // the biggest reported height

// atomic
Expand All @@ -101,8 +104,8 @@ func NewBlockPool(
start int64,
requestsCh chan<- BlockRequest,
errorsCh chan<- peerError,
peerManager *p2p.PeerManager,
) *BlockPool {

bp := &BlockPool{
logger: logger,
peers: make(map[types.NodeID]*bpPeer),
Expand All @@ -113,6 +116,7 @@ func NewBlockPool(
requestsCh: requestsCh,
errorsCh: errorsCh,
lastSyncRate: 0,
peerManager: peerManager,
}
bp.BaseService = *service.NewBaseService(logger, "BlockPool", bp)
return bp
Expand Down Expand Up @@ -408,13 +412,31 @@ func (pool *BlockPool) updateMaxPeerHeight() {
pool.maxPeerHeight = max
}

func (pool *BlockPool) getSortedPeers(peers map[types.NodeID]*bpPeer) []types.NodeID {
// Generate a sorted list
sortedPeers := make([]types.NodeID, 0, len(peers))

for peer := range peers {
sortedPeers = append(sortedPeers, peer)
}
sort.Slice(sortedPeers, func(i, j int) bool {
return pool.peerManager.Score(sortedPeers[i]) > pool.peerManager.Score(sortedPeers[j])
})
return sortedPeers
}

// Pick an available peer with the given height available.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()

for _, peer := range pool.peers {
// Generate a sorted list
sortedPeers := pool.getSortedPeers(pool.peers)
fmt.Printf("PSUDEBUG - block sync with sorted peers: %v\n", sortedPeers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we remove this debug log?

for _, nodeId := range sortedPeers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few further optimizations we can do:

  1. We should probably avoid using the low score peers, for example the ones with score 0
  2. We should do some random shuffling so that we don't always targeting the same few top peers

peer := pool.peers[nodeId]
pool.peerManager.Score(peer.id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we get the score out here without using it?

if peer.didTimeout {
pool.removePeer(peer.id)
continue
Expand Down
69 changes: 60 additions & 9 deletions internal/blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package blocksync

import (
"context"
"crypto/rand"
"encoding/hex"
"fmt"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/internal/p2p"
dbm "github.com/tendermint/tm-db"
mrand "math/rand"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/types"
)

Expand All @@ -24,6 +28,7 @@ type testPeer struct {
base int64
height int64
inputChan chan inputData // make sure each peer's data is sequential
score p2p.PeerScore
}

type inputData struct {
Expand Down Expand Up @@ -70,17 +75,42 @@ func (ps testPeers) stop() {
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ {
peerID := types.NodeID(tmrand.Str(12))
bytes := make([]byte, 20)
if _, err := rand.Read(bytes); err != nil {
panic(err)
}
peerID := types.NodeID(hex.EncodeToString(bytes))
height := minHeight + mrand.Int63n(maxHeight-minHeight)
base := minHeight + int64(i)
if base > height {
base = height
}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10)}
peers[peerID] = testPeer{peerID, base, height, make(chan inputData, 10), 1}
}
return peers
}

func makePeerManager(peers map[types.NodeID]testPeer) *p2p.PeerManager {
selfKey := ed25519.GenPrivKeyFromSecret([]byte{0xf9, 0x1b, 0x08, 0xaa, 0x38, 0xee, 0x34, 0xdd})
selfID := types.NodeIDFromPubKey(selfKey.PubKey())
peerScores := make(map[types.NodeID]p2p.PeerScore)
for nodeId, peer := range peers {
peerScores[nodeId] = peer.score

}
peerManager, _ := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: peerScores,
MaxConnected: 1,
MaxConnectedUpgrade: 2,
}, p2p.NopMetrics())
for nodeId, _ := range peers {
_, err := peerManager.Add(p2p.NodeAddress{Protocol: "memory", NodeID: nodeId})
if err != nil {
panic(err)
}
}
return peerManager
}
func TestBlockPoolBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -89,7 +119,7 @@ func TestBlockPoolBasic(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), start, requestsCh, errorsCh, makePeerManager(peers))

if err := pool.Start(ctx); err != nil {
t.Error(err)
Expand Down Expand Up @@ -147,7 +177,7 @@ func TestBlockPoolTimeout(t *testing.T) {
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(logger, start, requestsCh, errorsCh)
pool := NewBlockPool(logger, start, requestsCh, errorsCh, makePeerManager(peers))
err := pool.Start(ctx)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -205,12 +235,12 @@ func TestBlockPoolRemovePeer(t *testing.T) {
for i := 0; i < 10; i++ {
peerID := types.NodeID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData)}
peers[peerID] = testPeer{peerID, 0, height, make(chan inputData), 1}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)

pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, nil)
err := pool.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); pool.Wait() })
Expand All @@ -235,3 +265,24 @@ func TestBlockPoolRemovePeer(t *testing.T) {

assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

func TestSortedPeers(t *testing.T) {
peers := make(testPeers, 10)
peerIdA := types.NodeID(strings.Repeat("a", 40))
peerIdB := types.NodeID(strings.Repeat("b", 40))
peerIdC := types.NodeID(strings.Repeat("c", 40))

peers[peerIdA] = testPeer{peerIdA, 0, 1, make(chan inputData), 11}
peers[peerIdB] = testPeer{peerIdA, 0, 1, make(chan inputData), 10}
peers[peerIdC] = testPeer{peerIdA, 0, 1, make(chan inputData), 13}

requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(log.NewNopLogger(), 1, requestsCh, errorsCh, makePeerManager(peers))
// add peers
for peerID, peer := range peers {
pool.SetPeerRange(peerID, peer.base, peer.height)
}
// Peers should be sorted by score via peerManager
assert.Equal(t, []types.NodeID{peerIdC, peerIdA, peerIdB}, pool.getSortedPeers(pool.peers))
}
9 changes: 6 additions & 3 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type Reactor struct {
consReactor consensusReactor
blockSync *atomicBool

peerEvents p2p.PeerEventSubscriber
channel *p2p.Channel
peerEvents p2p.PeerEventSubscriber
peerManager *p2p.PeerManager
channel *p2p.Channel

requestsCh <-chan BlockRequest
errorsCh <-chan peerError
Expand All @@ -105,6 +106,7 @@ func NewReactor(
store *store.BlockStore,
consReactor consensusReactor,
peerEvents p2p.PeerEventSubscriber,
peerManager *p2p.PeerManager,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
Expand All @@ -119,6 +121,7 @@ func NewReactor(
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
peerEvents: peerEvents,
peerManager: peerManager,
metrics: metrics,
eventBus: eventBus,
restartCh: restartCh,
Expand Down Expand Up @@ -159,7 +162,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {

requestsCh := make(chan BlockRequest, maxTotalRequesters)
errorsCh := make(chan peerError, maxPeerErrBuffer) // NOTE: The capacity should be larger than the peer count.
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh)
r.pool = NewBlockPool(r.logger, startHeight, requestsCh, errorsCh, r.peerManager)
r.requestsCh = requestsCh
r.errorsCh = errorsCh

Expand Down
80 changes: 41 additions & 39 deletions internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func makeReactor(
privVal types.PrivValidator,
channelCreator p2p.ChannelCreator,
peerEvents p2p.PeerEventSubscriber,
peerManager *p2p.PeerManager,
restartChan chan struct{},
selfRemediationConfig *config.SelfRemediationConfig,
) *Reactor {
Expand Down Expand Up @@ -158,6 +159,7 @@ func makeReactor(
blockStore,
nil,
peerEvents,
peerManager,
true,
consensus.NopMetrics(),
nil, // eventbus, can be nil
Expand Down Expand Up @@ -203,6 +205,7 @@ func (rts *reactorTestSuite) addNode(
privVal,
chCreator,
peerEvents,
rts.network.Nodes[nodeID].PeerManager,
restartChan,
config.DefaultSelfRemediationConfig(),
)
Expand Down Expand Up @@ -354,49 +357,49 @@ func (m *MockBlockStore) Height() int64 {
func TestAutoRestartIfBehind(t *testing.T) {
t.Parallel()
tests := []struct {
name string
blocksBehindThreshold uint64
name string
blocksBehindThreshold uint64
blocksBehindCheckInterval time.Duration
selfHeight int64
maxPeerHeight int64
isBlockSync bool
restartExpected bool
selfHeight int64
maxPeerHeight int64
isBlockSync bool
restartExpected bool
}{
{
name: "Should not restart if blocksBehindThreshold is 0",
blocksBehindThreshold: 0,
name: "Should not restart if blocksBehindThreshold is 0",
blocksBehindThreshold: 0,
blocksBehindCheckInterval: 10 * time.Millisecond,
selfHeight: 100,
maxPeerHeight: 200,
isBlockSync: false,
restartExpected: false,
selfHeight: 100,
maxPeerHeight: 200,
isBlockSync: false,
restartExpected: false,
},
{
name: "Should not restart if behindHeight is less than threshold",
blocksBehindThreshold: 50,
selfHeight: 100,
name: "Should not restart if behindHeight is less than threshold",
blocksBehindThreshold: 50,
selfHeight: 100,
blocksBehindCheckInterval: 10 * time.Millisecond,
maxPeerHeight: 140,
isBlockSync: false,
restartExpected: false,
maxPeerHeight: 140,
isBlockSync: false,
restartExpected: false,
},
{
name: "Should restart if behindHeight is greater than or equal to threshold",
blocksBehindThreshold: 50,
selfHeight: 100,
name: "Should restart if behindHeight is greater than or equal to threshold",
blocksBehindThreshold: 50,
selfHeight: 100,
blocksBehindCheckInterval: 10 * time.Millisecond,
maxPeerHeight: 160,
isBlockSync: false,
restartExpected: true,
maxPeerHeight: 160,
isBlockSync: false,
restartExpected: true,
},
{
name: "Should not restart if blocksync",
blocksBehindThreshold: 50,
selfHeight: 100,
name: "Should not restart if blocksync",
blocksBehindThreshold: 50,
selfHeight: 100,
blocksBehindCheckInterval: 10 * time.Millisecond,
maxPeerHeight: 160,
isBlockSync: true,
restartExpected: false,
maxPeerHeight: 160,
isBlockSync: true,
restartExpected: false,
},
}

Expand All @@ -407,21 +410,20 @@ func TestAutoRestartIfBehind(t *testing.T) {
mockBlockStore.On("Height").Return(tt.selfHeight)

blockPool := &BlockPool{
logger: log.TestingLogger(),
height: tt.selfHeight,
logger: log.TestingLogger(),
height: tt.selfHeight,
maxPeerHeight: tt.maxPeerHeight,

}

restartChan := make(chan struct{}, 1)
r := &Reactor{
logger: log.TestingLogger(),
store: mockBlockStore,
pool: blockPool,
blocksBehindThreshold: tt.blocksBehindThreshold,
logger: log.TestingLogger(),
store: mockBlockStore,
pool: blockPool,
blocksBehindThreshold: tt.blocksBehindThreshold,
blocksBehindCheckInterval: tt.blocksBehindCheckInterval,
restartCh: restartChan,
blockSync: newAtomicBool(tt.isBlockSync),
restartCh: restartChan,
blockSync: newAtomicBool(tt.isBlockSync),
}

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ func makeNode(
blockStore,
csReactor,
peerManager.Subscribe,
peerManager,
blockSync && !stateSync && !shoulddbsync,
nodeMetrics.consensus,
eventBus,
Expand Down