Skip to content

Commit

Permalink
feat(lib/grandpa): fully verify justifications using GrandpaState (Ch…
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored Apr 29, 2021
1 parent ebe6dec commit 028d25e
Show file tree
Hide file tree
Showing 17 changed files with 394 additions and 166 deletions.
35 changes: 28 additions & 7 deletions dot/core/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,20 @@ func NewDigestHandler(blockState BlockState, epochState EpochState, grandpaState
}

// Start starts the DigestHandler
func (h *DigestHandler) Start() {
func (h *DigestHandler) Start() error {
go h.handleBlockImport(h.ctx)
go h.handleBlockFinalisation(h.ctx)
return nil
}

// Stop stops the DigestHandler
func (h *DigestHandler) Stop() {
func (h *DigestHandler) Stop() error {
h.cancel()
h.blockState.UnregisterImportedChannel(h.importedID)
h.blockState.UnregisterFinalizedChannel(h.finalisedID)
close(h.imported)
close(h.finalised)
return nil
}

// NextGrandpaAuthorityChange returns the block number of the next upcoming grandpa authorities change.
Expand Down Expand Up @@ -145,6 +147,8 @@ func (h *DigestHandler) HandleConsensusDigest(d *types.ConsensusDigest, header *
return h.handleScheduledChange(d, header)
case types.GrandpaForcedChangeType:
return h.handleForcedChange(d, header)
case types.GrandpaOnDisabledType:
return nil // do nothing, as this is not implemented in substrate
case types.GrandpaPauseType:
return h.handlePause(d)
case types.GrandpaResumeType:
Expand Down Expand Up @@ -208,37 +212,49 @@ func (h *DigestHandler) handleBlockFinalisation(ctx context.Context) {

func (h *DigestHandler) handleGrandpaChangesOnImport(num *big.Int) error {
resume := h.grandpaResume
if resume != nil && num.Cmp(resume.atBlock) == 0 {
if resume != nil && num.Cmp(resume.atBlock) > -1 {
h.grandpaResume = nil
}

fc := h.grandpaForcedChange
if fc != nil && num.Cmp(fc.atBlock) == 0 {
if fc != nil && num.Cmp(fc.atBlock) > -1 {
err := h.grandpaState.IncrementSetID()
if err != nil {
return err
}

h.grandpaForcedChange = nil
curr, err := h.grandpaState.GetCurrentSetID()
if err != nil {
return err
}

logger.Debug("incremented grandpa set ID", "set ID", curr)
}

return nil
}

func (h *DigestHandler) handleGrandpaChangesOnFinalization(num *big.Int) error {
pause := h.grandpaPause
if pause != nil && num.Cmp(pause.atBlock) == 0 {
if pause != nil && num.Cmp(pause.atBlock) > -1 {
h.grandpaPause = nil
}

sc := h.grandpaScheduledChange
if sc != nil && num.Cmp(sc.atBlock) == 0 {
if sc != nil && num.Cmp(sc.atBlock) > -1 {
err := h.grandpaState.IncrementSetID()
if err != nil {
return err
}

h.grandpaScheduledChange = nil
curr, err := h.grandpaState.GetCurrentSetID()
if err != nil {
return err
}

logger.Debug("incremented grandpa set ID", "set ID", curr)
}

// if blocks get finalised before forced change takes place, disregard it
Expand Down Expand Up @@ -281,7 +297,11 @@ func (h *DigestHandler) handleScheduledChange(d *types.ConsensusDigest, header *
return err
}

return h.grandpaState.SetNextChange(types.NewGrandpaVotersFromAuthorities(auths), big.NewInt(0).Add(header.Number, big.NewInt(int64(sc.Delay))))
logger.Debug("setting GrandpaScheduledChange", "at block", big.NewInt(0).Add(header.Number, big.NewInt(int64(sc.Delay))))
return h.grandpaState.SetNextChange(
types.NewGrandpaVotersFromAuthorities(auths),
big.NewInt(0).Add(header.Number, big.NewInt(int64(sc.Delay))),
)
}

func (h *DigestHandler) handleForcedChange(d *types.ConsensusDigest, header *types.Header) error {
Expand Down Expand Up @@ -318,6 +338,7 @@ func (h *DigestHandler) handleForcedChange(d *types.ConsensusDigest, header *typ
return err
}

logger.Debug("setting GrandpaForcedChange", "at block", big.NewInt(0).Add(header.Number, big.NewInt(int64(fc.Delay))))
return h.grandpaState.SetNextChange(
types.NewGrandpaVotersFromAuthorities(auths),
big.NewInt(0).Add(header.Number, big.NewInt(int64(fc.Delay))),
Expand Down
1 change: 1 addition & 0 deletions dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,5 @@ type GrandpaState interface {
IncrementSetID() error
SetNextPause(number *big.Int) error
SetNextResume(number *big.Int) error
GetCurrentSetID() (uint64, error)
}
2 changes: 1 addition & 1 deletion dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func (q *syncQueue) handleBlockAnnounceHandshake(blockNum uint32, from peer.ID)

func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID) {
q.updatePeerScore(from, 1)
logger.Info("received BlockAnnounce", "number", msg.Number, "from", from)
logger.Debug("received BlockAnnounce", "number", msg.Number, "from", from)

header, err := types.NewHeader(msg.ParentHash, msg.StateRoot, msg.ExtrinsicsRoot, msg.Number, msg.Digest)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
if err != nil {
return nil, err
}
nodeSrvcs = append(nodeSrvcs, dh)

// create GRANDPA service
fg, err := createGRANDPAService(cfg, rt, stateSrvc, dh, ks.Gran, networkSrvc)
Expand Down
2 changes: 1 addition & 1 deletion dot/state/block_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (bs *BlockState) notifyFinalized(hash common.Hash) {
return
}

logger.Trace("notifying finalised block chans...", "chans", bs.finalised)
logger.Debug("notifying finalised block chans...", "chans", bs.finalised)

for _, ch := range bs.finalised {
go func(ch chan<- *types.Header) {
Expand Down
48 changes: 48 additions & 0 deletions dot/state/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func NewGrandpaStateFromGenesis(db chaindb.Database, genesisAuthorities []*types
return nil, err
}

err = s.setSetIDChangeAtBlock(genesisSetID, big.NewInt(0))
if err != nil {
return nil, err
}

return s, nil
}

Expand Down Expand Up @@ -180,6 +185,49 @@ func (s *GrandpaState) GetSetIDChange(setID uint64) (*big.Int, error) {
return big.NewInt(0).SetBytes(num), nil
}

// GetSetIDByBlockNumber returns the set ID for a given block number
func (s *GrandpaState) GetSetIDByBlockNumber(num *big.Int) (uint64, error) {
curr, err := s.GetCurrentSetID()
if err != nil {
return 0, err
}

for {
changeUpper, err := s.GetSetIDChange(curr + 1)
if err == chaindb.ErrKeyNotFound {
if curr == 0 {
return 0, nil
}
curr = curr - 1
continue
}
if err != nil {
return 0, err
}

changeLower, err := s.GetSetIDChange(curr)
if err != nil {
return 0, err
}

// if the given block number is greater or equal to the block number of the set ID change,
// return the current set ID
if num.Cmp(changeUpper) < 1 && num.Cmp(changeLower) == 1 {
return curr, nil
}

if num.Cmp(changeUpper) == 1 {
return curr + 1, nil
}

curr = curr - 1

if int(curr) < 0 {
return 0, nil
}
}
}

// SetNextPause sets the next grandpa pause at the given block number
func (s *GrandpaState) SetNextPause(number *big.Int) error {
return s.db.Put(pauseKey, number.Bytes())
Expand Down
41 changes: 41 additions & 0 deletions dot/state/grandpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func TestNewGrandpaStateFromGenesis(t *testing.T) {
auths, err := gs.GetAuthorities(currSetID)
require.NoError(t, err)
require.Equal(t, testAuths, auths)

num, err := gs.GetSetIDChange(0)
require.NoError(t, err)
require.Equal(t, big.NewInt(0), num)
}

func TestGrandpaState_SetNextChange(t *testing.T) {
Expand Down Expand Up @@ -81,3 +85,40 @@ func TestGrandpaState_IncrementSetID(t *testing.T) {
require.NoError(t, err)
require.Equal(t, genesisSetID+1, setID)
}

func TestGrandpaState_GetSetIDByBlockNumber(t *testing.T) {
db := NewInMemoryDB(t)
gs, err := NewGrandpaStateFromGenesis(db, testAuths)
require.NoError(t, err)

testAuths2 := []*types.GrandpaVoter{
{Key: kr.Bob().Public().(*ed25519.PublicKey), ID: 0},
}

err = gs.SetNextChange(testAuths2, big.NewInt(100))
require.NoError(t, err)

setID, err := gs.GetSetIDByBlockNumber(big.NewInt(50))
require.NoError(t, err)
require.Equal(t, genesisSetID, setID)

setID, err = gs.GetSetIDByBlockNumber(big.NewInt(100))
require.NoError(t, err)
require.Equal(t, genesisSetID, setID)

setID, err = gs.GetSetIDByBlockNumber(big.NewInt(101))
require.NoError(t, err)
require.Equal(t, genesisSetID+1, setID)

err = gs.IncrementSetID()
require.NoError(t, err)

setID, err = gs.GetSetIDByBlockNumber(big.NewInt(100))
require.NoError(t, err)
require.Equal(t, genesisSetID, setID)

setID, err = gs.GetSetIDByBlockNumber(big.NewInt(101))
require.NoError(t, err)
require.Equal(t, genesisSetID+1, setID)

}
24 changes: 24 additions & 0 deletions dot/state/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,30 @@ func (s *Service) Rewind(toBlock int64) error {
return err
}

// update the current grandpa set ID
prevSetID, err := s.Grandpa.GetCurrentSetID()
if err != nil {
return err
}

newSetID, err := s.Grandpa.GetSetIDByBlockNumber(header.Number)
if err != nil {
return err
}

err = s.Grandpa.setCurrentSetID(newSetID)
if err != nil {
return err
}

// remove previously set grandpa changes, need to go up to prevSetID+1 in case of a scheduled change
for i := newSetID + 1; i <= prevSetID+1; i++ {
err = s.Grandpa.db.Del(setIDChangeKey(i))
if err != nil {
return err
}
}

return s.Base.StoreBestBlockHash(newHead)
}

Expand Down
26 changes: 26 additions & 0 deletions dot/state/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ChainSafe/gossamer/lib/trie"
"github.com/ChainSafe/gossamer/lib/utils"

"github.com/ChainSafe/chaindb"
log "github.com/ChainSafe/log15"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -229,13 +230,38 @@ func TestService_Rewind(t *testing.T) {
err = serv.Start()
require.NoError(t, err)

err = serv.Grandpa.setCurrentSetID(3)
require.NoError(t, err)

err = serv.Grandpa.setSetIDChangeAtBlock(1, big.NewInt(5))
require.NoError(t, err)

err = serv.Grandpa.setSetIDChangeAtBlock(2, big.NewInt(8))
require.NoError(t, err)

err = serv.Grandpa.setSetIDChangeAtBlock(3, big.NewInt(10))
require.NoError(t, err)

AddBlocksToState(t, serv.Block, 12)
err = serv.Rewind(6)
require.NoError(t, err)

num, err := serv.Block.BestBlockNumber()
require.NoError(t, err)
require.Equal(t, big.NewInt(6), num)

setID, err := serv.Grandpa.GetCurrentSetID()
require.NoError(t, err)
require.Equal(t, uint64(1), setID)

_, err = serv.Grandpa.GetSetIDChange(1)
require.NoError(t, err)

_, err = serv.Grandpa.GetSetIDChange(2)
require.Equal(t, chaindb.ErrKeyNotFound, err)

_, err = serv.Grandpa.GetSetIDChange(3)
require.Equal(t, chaindb.ErrKeyNotFound, err)
}

func TestService_Import(t *testing.T) {
Expand Down
2 changes: 0 additions & 2 deletions dot/sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type BlockProducer interface {

// DigestHandler is the interface for the consensus digest handler
type DigestHandler interface {
Start()
Stop()
HandleConsensusDigest(*types.ConsensusDigest, *types.Header) error
}

Expand Down
9 changes: 7 additions & 2 deletions dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ func (s *Service) ProcessBlockData(data []*types.BlockData) (int, error) {
return i, err
}

// handle consensus digests for authority changes
if s.digestHandler != nil {
s.handleDigests(header)
}

if bd.Justification != nil && bd.Justification.Exists() {
logger.Debug("handling Justification...", "number", header.Number, "hash", bd.Hash)
s.handleJustification(header, bd.Justification.Value())
Expand Down Expand Up @@ -427,13 +432,13 @@ func (s *Service) handleDigests(header *types.Header) {
if d.Type() == types.ConsensusDigestType {
cd, ok := d.(*types.ConsensusDigest)
if !ok {
logger.Error("handleDigests", "index", i, "error", "cannot cast invalid consensus digest item")
logger.Error("handleDigests", "block number", header.Number, "index", i, "error", "cannot cast invalid consensus digest item")
continue
}

err := s.digestHandler.HandleConsensusDigest(cd, header)
if err != nil {
logger.Error("handleDigests", "index", i, "digest", cd, "error", err)
logger.Error("handleDigests", "block number", header.Number, "index", i, "digest", cd, "error", err)
}
}
}
Expand Down
6 changes: 1 addition & 5 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ func (h *Handler) SendBlockImport(bestHash string, height *big.Int) {

func (h *Handler) sendTelemtry() {
for _, c := range h.wsConn {
err := c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
if err != nil {
// TODO (ed) determine how to handle this error
fmt.Printf("ERROR connecting to telemetry %v\n", err)
}
_ = c.WriteMessage(websocket.TextMessage, h.buf.Bytes())
}
h.buf.Reset()
}
Loading

0 comments on commit 028d25e

Please sign in to comment.