Skip to content
This repository has been archived by the owner on Oct 15, 2024. It is now read-only.

Fix consensus failure when remote signer drops #153

Merged
merged 5 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 85 additions & 13 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/crypto"
tmevents "github.com/tendermint/tendermint/libs/events"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
Expand All @@ -31,6 +32,8 @@ var (
ErrInvalidProposalPOLRound = errors.New("Error invalid proposal POL round")
ErrAddingVote = errors.New("Error adding vote")
ErrVoteHeightMismatch = errors.New("Error vote height mismatch")

errPubKeyIsNotSet = errors.New("Pubkey is not set. Look for \"Can't get private validator pubkey\" errors")
)

//-----------------------------------------------------------------------------
Expand Down Expand Up @@ -95,6 +98,9 @@ type ConsensusState struct {
mtx sync.RWMutex
cstypes.RoundState
state sm.State // State until height-1.
// privValidator pubkey, memoized for the duration of one block
// to avoid extra requests to HSM
privValidatorPubKey crypto.PubKey

// state changes may be triggered by: msgs from peers,
// msgs from ourself, or by timeouts
Expand Down Expand Up @@ -251,11 +257,17 @@ func (cs *ConsensusState) GetValidators() (int64, []*types.Validator) {
return cs.state.LastBlockHeight, cs.state.Validators.Copy().Validators
}

// SetPrivValidator sets the private validator account for signing votes.
// SetPrivValidator sets the private validator account for signing votes. It
// immediately requests pubkey and caches it.
func (cs *ConsensusState) SetPrivValidator(priv types.PrivValidator) {
cs.mtx.Lock()
defer cs.mtx.Unlock()

cs.privValidator = priv
cs.mtx.Unlock()

if err := cs.updatePrivValidatorPubKey(); err != nil {
cs.Logger.Error("Can't get private validator pubkey", "err", err)
}
}

// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing.
Expand Down Expand Up @@ -872,8 +884,17 @@ func (cs *ConsensusState) enterPropose(height int64, round int) {
return
}

logger.Debug("This node is a validator")

if cs.privValidatorPubKey == nil {
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
logger.Error(fmt.Sprintf("enterPropose: %v", errPubKeyIsNotSet))
return
}
address := cs.privValidatorPubKey.Address()

// if not a validator, we're done
address := cs.privValidator.GetPubKey().Address()
if !cs.Validators.HasAddress(address) {
logger.Debug("This node is not a validator", "addr", address, "vals", cs.Validators)
return
Expand Down Expand Up @@ -950,7 +971,12 @@ func (cs *ConsensusState) isProposalComplete() bool {
// is returned for convenience so we can log the proposal block.
// Returns nil block upon error.
// NOTE: keep it side-effect free for clarity.
// CONTRACT: cs.privValidator is not nil.
func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts *types.PartSet) {
if cs.privValidator == nil {
panic("entered createProposalBlock with privValidator being nil")
}

var commit *types.Commit
switch {
case cs.Height == 1:
Expand All @@ -960,13 +986,19 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
case cs.LastCommit.HasTwoThirdsMajority():
// Make the commit from LastCommit
commit = cs.LastCommit.MakeCommit()
default:
// This shouldn't happen.
cs.Logger.Error("enterPropose: Cannot propose anything: No commit for the previous block.")
default: // This shouldn't happen.
cs.Logger.Error("enterPropose: Cannot propose anything: No commit for the previous block")
return
}

proposerAddr := cs.privValidator.GetPubKey().Address()
if cs.privValidatorPubKey == nil {
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
cs.Logger.Error(fmt.Sprintf("enterPropose: %v", errPubKeyIsNotSet))
return
}
proposerAddr := cs.privValidatorPubKey.Address()

return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr)
}

Expand Down Expand Up @@ -1343,6 +1375,11 @@ func (cs *ConsensusState) finalizeCommit(height int64) {

fail.Fail() // XXX

// Private validator might have changed it's key pair => refetch pubkey.
if err := cs.updatePrivValidatorPubKey(); err != nil {
cs.Logger.Error("Can't get private validator pubkey", "err", err)
}

// cs.StartTime is already set.
// Schedule Round0 to start soon.
cs.scheduleRound0(&cs.RoundState)
Expand All @@ -1356,8 +1393,11 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
func (cs *ConsensusState) recordMetrics(height int64, block *types.Block) {
cs.metrics.Validators.Set(float64(cs.Validators.Size()))
cs.metrics.ValidatorsPower.Set(float64(cs.Validators.TotalVotingPower()))
missingValidators := 0
missingValidatorsPower := int64(0)
var (
missingValidators = 0
missingValidatorsPower int64
)

for i, val := range cs.Validators.Validators {
var vote *types.CommitSig
if i < len(block.LastCommit.Precommits) {
Expand Down Expand Up @@ -1510,9 +1550,16 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, err
if err == ErrVoteHeightMismatch {
return added, err
} else if voteErr, ok := err.(*types.ErrVoteConflictingVotes); ok {
if cs.privValidatorPubKey == nil {
return false, errPubKeyIsNotSet
}
addr := cs.privValidator.GetPubKey().Address()
if bytes.Equal(vote.ValidatorAddress, addr) {
cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type)
cs.Logger.Error(
"Found conflicting vote from ourselves. Did you unsafe_reset a validator?",
"height", vote.Height,
"round", vote.Round,
"type", vote.Type)
return added, err
}
cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence)
Expand Down Expand Up @@ -1688,7 +1735,10 @@ func (cs *ConsensusState) signVote(type_ types.SignedMsgType, hash []byte, heade
// Flush the WAL. Otherwise, we may not recompute the same vote to sign, and the privValidator will refuse to sign anything.
cs.wal.FlushAndSync()

addr := cs.privValidator.GetPubKey().Address()
if cs.privValidatorPubKey == nil {
return nil, errPubKeyIsNotSet
}
addr := cs.privValidatorPubKey.Address()
valIndex, _ := cs.Validators.GetByAddress(addr)

vote := &types.Vote{
Expand Down Expand Up @@ -1723,8 +1773,18 @@ func (cs *ConsensusState) voteTime() time.Time {

// sign the vote and publish on internalMsgQueue
func (cs *ConsensusState) signAddVote(type_ types.SignedMsgType, hash []byte, header types.PartSetHeader) *types.Vote {
// if we don't have a key or we're not in the validator set, do nothing
if cs.privValidator == nil || !cs.Validators.HasAddress(cs.privValidator.GetPubKey().Address()) {
if cs.privValidator == nil { // the node does not have a key
return nil
}

if cs.privValidatorPubKey == nil {
// Vote won't be signed, but it's not critical.
cs.Logger.Error(fmt.Sprintf("signAddVote: %v", errPubKeyIsNotSet))
return nil
}

// If the node not in the validator set, do nothing.
if !cs.Validators.HasAddress(cs.privValidatorPubKey.Address()) {
return nil
}
vote, err := cs.signVote(type_, hash, header)
Expand All @@ -1739,6 +1799,18 @@ func (cs *ConsensusState) signAddVote(type_ types.SignedMsgType, hash []byte, he
return nil
}

// updatePrivValidatorPubKey get's the private validator public key and
// memoizes it. This func returns an error if the private validator is not
// responding or responds with an error.
func (cs *ConsensusState) updatePrivValidatorPubKey() error {
if cs.privValidator == nil {
return nil
}

cs.privValidatorPubKey = cs.privValidator.GetPubKey()
return nil
}

//---------------------------------------------------------

func CompareHRS(h1 int64, r1 int, s1 cstypes.RoundStepType, h2 int64, r2 int, s2 cstypes.RoundStepType) int {
Expand Down
14 changes: 13 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,7 +1269,19 @@ func createAndStartPrivValidatorSocketClient(
return nil, errors.Wrap(err, "failed to start private validator")
}

return pvsc, nil
// try to get a pubkey from private validate first time
pubKey := pvsc.GetPubKey()
if pubKey == nil {
return nil, errors.New("could not retrieve public key from private validator")
}

const (
retries = 50 // 50 * 100ms = 5s total
timeout = 100 * time.Millisecond
)
pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout)

return pvscWithRetries, nil
}

// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
Expand Down
4 changes: 2 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {

n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}

// address without a protocol must result in error
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {

n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
}

// testFreeAddr claims a free port so we don't block on listener being ready.
Expand Down
14 changes: 8 additions & 6 deletions privval/errors.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package privval

import (
"errors"
"fmt"
)

// EndpointTimeoutError occurs when endpoint times out.
type EndpointTimeoutError struct{}

// Implement the net.Error interface.
Expand All @@ -13,15 +15,15 @@ func (e EndpointTimeoutError) Temporary() bool { return true }

// Socket errors.
var (
ErrUnexpectedResponse = fmt.Errorf("received unexpected response")
ErrNoConnection = fmt.Errorf("endpoint is not connected")
ErrConnectionTimeout = EndpointTimeoutError{}

ErrReadTimeout = fmt.Errorf("endpoint read timed out")
ErrWriteTimeout = fmt.Errorf("endpoint write timed out")
ErrNoConnection = errors.New("endpoint is not connected")
ErrReadTimeout = errors.New("endpoint read timed out")
ErrUnexpectedResponse = errors.New("empty response")
ErrWriteTimeout = errors.New("endpoint write timed out")
)

// RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply.
// RemoteSignerError allows (remote) validators to include meaningful error
// descriptions in their reply.
type RemoteSignerError struct {
// TODO(ismail): create an enum of known errors
Code int
Expand Down
85 changes: 85 additions & 0 deletions privval/retry_signer_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package privval

import (
"errors"
"time"

"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/types"
)

// RetrySignerClient wraps SignerClient adding retry for each operation (except
// Ping) w/ a timeout.
type RetrySignerClient struct {
next *SignerClient
retries int
timeout time.Duration
}

// NewRetrySignerClient returns RetrySignerClient. If +retries+ is 0, the
// client will be retrying each operation indefinitely.
func NewRetrySignerClient(sc *SignerClient, retries int, timeout time.Duration) *RetrySignerClient {
return &RetrySignerClient{sc, retries, timeout}
}

var _ types.PrivValidator = (*RetrySignerClient)(nil)

func (sc *RetrySignerClient) Close() error {
return sc.next.Close()
}

func (sc *RetrySignerClient) IsConnected() bool {
return sc.next.IsConnected()
}

func (sc *RetrySignerClient) WaitForConnection(maxWait time.Duration) error {
return sc.next.WaitForConnection(maxWait)
}

//--------------------------------------------------------
// Implement PrivValidator

func (sc *RetrySignerClient) Ping() error {
return sc.next.Ping()
}

func (sc *RetrySignerClient) GetPubKey() crypto.PubKey {
for i := 0; i < sc.retries || sc.retries == 0; i++ {
pk := sc.next.GetPubKey()
if pk != nil {
return pk
}
time.Sleep(sc.timeout)
}
return nil
}

func (sc *RetrySignerClient) SignVote(chainID string, vote *types.Vote) error {
for i := 0; i < sc.retries || sc.retries == 0; i++ {
err := sc.next.SignVote(chainID, vote)
if err == nil {
return nil
}
// If remote signer errors, we don't retry.
if _, ok := err.(*RemoteSignerError); ok {
return err
}
time.Sleep(sc.timeout)
}
return errors.New("exhausted all attempts to sign vote")
}

func (sc *RetrySignerClient) SignProposal(chainID string, proposal *types.Proposal) error {
for i := 0; i < sc.retries || sc.retries == 0; i++ {
err := sc.next.SignProposal(chainID, proposal)
if err == nil {
return nil
}
// If remote signer errors, we don't retry.
if _, ok := err.(*RemoteSignerError); ok {
return err
}
time.Sleep(sc.timeout)
}
return errors.New("exhausted all attempts to sign proposal")
}
Loading