Skip to content
This repository has been archived by the owner on Oct 15, 2024. It is now read-only.

Bugfix post v0.32 #155

Merged
merged 6 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ program](https://hackerone.com/tendermint).
### IMPROVEMENTS:

### BUG FIXES:

2 changes: 1 addition & 1 deletion consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options
metrics: NopMetrics(),
}
conR.updateFastSyncingMetric()
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

for _, option := range options {
option(conR)
Expand Down
26 changes: 21 additions & 5 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (cs *ConsensusState) updateToState(state sm.State) {
// We add timeoutCommit to allow transactions
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = cs.config.Commit(tmtime.Now())
} else {
cs.StartTime = cs.config.Commit(cs.CommitTime)
Expand Down Expand Up @@ -756,17 +756,33 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
func (cs *ConsensusState) handleTxsAvailable() {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// we only need to do this for round 0
cs.enterNewRound(cs.Height, 0)
cs.enterPropose(cs.Height, 0)

// We only need to do this for round 0.
if cs.Round != 0 {
return
}

switch cs.Step {
case cstypes.RoundStepNewHeight: // timeoutCommit phase
if cs.needProofBlock(cs.Height) {
// enterPropose will be called by enterNewRound
return
}

// +1ms to ensure RoundStepNewRound timeout always happens after RoundStepNewHeight
timeoutCommit := cs.StartTime.Sub(tmtime.Now()) + 1*time.Millisecond
cs.scheduleTimeout(timeoutCommit, cs.Height, 0, cstypes.RoundStepNewRound)
case cstypes.RoundStepNewRound: // after timeoutCommit
cs.enterPropose(cs.Height, 0)
}
}

//-----------------------------------------------------------------------------
// State functions
// Used internally by handleTimeout and handleMsg to make state transitions

// Enter: `timeoutNewHeight` by startTime (commitTime+timeoutCommit),
// or, if SkipTimeout==true, after receiving all precommits from (height,round-1)
// or, if SkipTimeoutCommit==true, after receiving all precommits from (height,round-1)
// Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1)
// Enter: +2/3 precommits for nil at (height,round-1)
// Enter: +2/3 prevotes any or +2/3 precommits for block or any from (height, round)
Expand Down
2 changes: 1 addition & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
evR := &EvidenceReactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
}

Expand Down
4 changes: 2 additions & 2 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ type CListMempool struct {

// Atomic integers
height int64 // the last block Update()'d to
rechecking int32 // for re-checking filtered txs on Update()
txsBytes int64 // total size of mempool, in bytes

rechecking int32 // for re-checking filtered txs on Update()

// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
cache txCache
Expand Down
11 changes: 8 additions & 3 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type mempoolIDs struct {
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}

// Reserve searches for the next unused ID and assignes it to the
// Reserve searches for the next unused ID and assigns it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
Expand Down Expand Up @@ -118,10 +118,16 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor {
mempool: mempool,
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR)
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR
}

// InitPeer implements Reactor by creating a state for the peer.
func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer {
memR.ids.ReserveForPeer(peer)
return peer
}

// SetLogger sets the Logger on the reactor and the underlying mempool.
func (memR *Reactor) SetLogger(l log.Logger) {
memR.Logger = l
Expand Down Expand Up @@ -157,7 +163,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *Reactor) AddPeer(peer p2p.Peer) {
memR.ids.ReserveForPeer(peer)
go memR.broadcastTxRoutine(peer)
}

Expand Down
18 changes: 18 additions & 0 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
ids.ReserveForPeer(peer)
})
}

func TestDontExhaustMaxActiveIDs(t *testing.T) {
config := cfg.TestConfig()
const N = 1
reactors := makeAndConnectReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
reactor := reactors[0]

for i := 0; i < maxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.AddPeer(peer)
}
}
5 changes: 5 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ func createTransport(config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey *p2p.Nod
}

p2p.MultiplexTransportConnFilters(connFilters...)(transport)

// Limit the number of incoming connections.
max := config.P2P.MaxNumInboundPeers
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)

return transport, peerFilters
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
lastReceivedRequests: cmn.NewCMap(),
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
}
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
r.BaseReactor = *p2p.NewBaseReactor("PEX", r)
return r
}

Expand Down
16 changes: 14 additions & 2 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/pkg/errors"
"golang.org/x/net/netutil"

"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/p2p/conn"
Expand Down Expand Up @@ -122,11 +123,18 @@ func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.resolver = resolver }
}

// MultiplexTransportMaxIncomingConnections sets the maximum number of
// simultaneous connections (incoming). Default: 0 (unlimited)
func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n }
}

// MultiplexTransport accepts and dials tcp connections and upgrades them to
// multiplexed peers.
type MultiplexTransport struct {
netAddr NetAddress
listener net.Listener
netAddr NetAddress
listener net.Listener
maxIncomingConnections int // see MaxIncomingConnections

acceptc chan accept
closec chan struct{}
Expand Down Expand Up @@ -240,6 +248,10 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
return err
}

if mt.maxIncomingConnections > 0 {
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
}

mt.netAddr = addr
mt.listener = ln

Expand Down
45 changes: 45 additions & 0 deletions p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"net"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -134,6 +135,50 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
}
}

func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
id := mt.nodeKey.ID()

MultiplexTransportMaxIncomingConnections(0)(mt)

addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}

if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}

errc := make(chan error)

go func() {
addr := NewNetAddress(id, mt.listener.Addr())

_, err := addr.Dial()
if err != nil {
errc <- err
return
}

close(errc)
}()

if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}

_, err = mt.Accept(peerConfig{})
if err == nil || !strings.Contains(err.Error(), "connection reset by peer") {
t.Errorf("expected connection reset by peer error, got %v", err)
}
}

func TestTransportMultiplexAcceptMultiple(t *testing.T) {
mt := testSetupMultiplexTransport(t)
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
Expand Down
8 changes: 6 additions & 2 deletions state/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ func (txi *TxIndex) Search(queryStr string) ([]*types.TxResult, error) {
return nil, errors.Wrap(err, "error during searching for a hash in the query")
} else if ok {
res, err := txi.Get(hash)
if res == nil {
switch {
case err != nil:
return []*types.TxResult{}, errors.Wrap(err, "error while retrieving the result")
case res == nil:
return []*types.TxResult{}, nil
default:
return []*types.TxResult{res}, nil
}
return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result")
}

// conditions to skip because they're handled before "everything else"
Expand Down