Skip to content

Commit

Permalink
fix(dot/state, lib/babe, lib/trie): improve syncing between gossamer …
Browse files Browse the repository at this point in the history
…authority nodes (ChainSafe#1613)
  • Loading branch information
noot authored Jun 9, 2021
1 parent 00a8df1 commit ca99fbf
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 99 deletions.
1 change: 0 additions & 1 deletion dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func createTestService(t *testing.T, cfg *Config) (srvc *Service) {

t.Cleanup(func() {
srvc.Stop()
time.Sleep(time.Second)
err = os.RemoveAll(cfg.BasePath)
if err != nil {
fmt.Printf("failed to remove path %s : %s\n", cfg.BasePath, err)
Expand Down
7 changes: 7 additions & 0 deletions dot/rpc/modules/dev_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ func newBABEService(t *testing.T) *babe.Service {
EpochState: es,
Keypair: kr.Alice().(*sr25519.Keypair),
Runtime: rt,
IsDev: true,
}

babe, err := babe.NewService(cfg)
require.NoError(t, err)
err = babe.Start()
require.NoError(t, err)
t.Cleanup(func() {
_ = babe.Stop()
})
return babe
}

func TestDevControl_Babe(t *testing.T) {
t.Skip() // skip for now, blocks on `babe.Service.Resume()`
bs := newBABEService(t)
m := NewDevModule(bs, nil)

Expand Down
16 changes: 3 additions & 13 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ func finalizedHashKey(round, setID uint64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, round)
key := append(common.FinalizedBlockHashKey, buf...)
binary.LittleEndian.PutUint64(buf, setID)
return append(key, buf...)
buf2 := make([]byte, 8)
binary.LittleEndian.PutUint64(buf2, setID)
return append(key, buf2...)
}

// GenesisHash returns the hash of the genesis block
Expand Down Expand Up @@ -368,17 +369,6 @@ func (bs *BlockState) SetBlockBody(hash common.Hash, body *types.Body) error {

// HasFinalizedBlock returns true if there is a finalised block for a given round and setID, false otherwise
func (bs *BlockState) HasFinalizedBlock(round, setID uint64) (bool, error) {
// get current round
r, err := bs.GetRound()
if err != nil {
return false, err
}

// round that is being queried for has not yet finalised
if round > r {
return false, fmt.Errorf("round not yet finalised")
}

return bs.db.Has(finalizedHashKey(round, setID))
}

Expand Down
17 changes: 8 additions & 9 deletions dot/state/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func (s *StorageState) pruneKey(keyHeader *types.Header) {
// StoreTrie stores the given trie in the StorageState and writes it to the database
func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error {
s.lock.Lock()
defer s.lock.Unlock()

root := ts.MustRoot()
if s.syncing {
// keep only the trie at the head of the chain when syncing
Expand All @@ -105,17 +107,15 @@ func (s *StorageState) StoreTrie(ts *rtstorage.TrieState) error {
}
}
s.tries[root] = ts.Trie()
s.lock.Unlock()

logger.Trace("cached trie in storage state", "root", root)
logger.Debug("cached trie in storage state", "root", root)

if err := ts.Trie().WriteDirty(s.db); err != nil {
if err := s.tries[root].WriteDirty(s.db); err != nil {
logger.Warn("failed to write trie to database", "root", root, "error", err)
return err
}

go s.notifyAll(root)

return nil
}

Expand Down Expand Up @@ -146,15 +146,14 @@ func (s *StorageState) TrieState(root *common.Hash) (*rtstorage.TrieState, error
}
}

curr, err := rtstorage.NewTrieState(t)
nextTrie := t.Snapshot()
next, err := rtstorage.NewTrieState(nextTrie)
if err != nil {
return nil, err
}

s.lock.Lock()
s.tries[*root] = curr.Snapshot()
s.lock.Unlock()
return curr, nil
logger.Trace("returning trie to be modified", "root", root, "next", next.MustRoot())
return next, nil
}

// LoadFromDB loads an encoded trie from the DB where the key is `root`
Expand Down
4 changes: 2 additions & 2 deletions dot/state/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestStorage_StoreAndLoadTrie(t *testing.T) {
require.NoError(t, err)
ts2, err := runtime.NewTrieState(trie)
require.NoError(t, err)
ts2.Snapshot()
require.Equal(t, ts.Trie(), ts2.Trie())
new := ts2.Snapshot()
require.Equal(t, ts.Trie(), new)
}

func TestStorage_GetStorageByBlockHash(t *testing.T) {
Expand Down
18 changes: 9 additions & 9 deletions dot/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestTrieSnapshot(t *testing.T) {
require.NoError(t, err)

// Take Snapshot of the trie.
ssTrie := tri.Snapshot()
newTrie := tri.Snapshot()

// Get the Trie root hash for all the 3 tries.
tHash, err := tri.Hash()
Expand All @@ -179,16 +179,16 @@ func TestTrieSnapshot(t *testing.T) {
dcTrieHash, err := dcTrie.Hash()
require.NoError(t, err)

ssTrieHash, err := ssTrie.Hash()
newTrieHash, err := newTrie.Hash()
require.NoError(t, err)

// Root hash for all the 3 tries should be equal.
// Root hash for the 3 tries should be equal.
require.Equal(t, tHash, dcTrieHash)
require.Equal(t, dcTrieHash, ssTrieHash)
require.Equal(t, tHash, newTrieHash)

// Modify the current trie.
value[0] = 'w'
tri.Put(key, value)
newTrie.Put(key, value)

// Get the updated root hash of all tries.
tHash, err = tri.Hash()
Expand All @@ -197,11 +197,11 @@ func TestTrieSnapshot(t *testing.T) {
dcTrieHash, err = dcTrie.Hash()
require.NoError(t, err)

ssTrieHash, err = ssTrie.Hash()
newTrieHash, err = newTrie.Hash()
require.NoError(t, err)

// Only the current trie should have a different root hash since it is updated.
require.NotEqual(t, tHash, dcTrieHash)
require.NotEqual(t, tHash, ssTrieHash)
require.Equal(t, dcTrieHash, ssTrieHash)
require.NotEqual(t, newTrieHash, dcTrieHash)
require.NotEqual(t, newTrieHash, tHash)
require.Equal(t, dcTrieHash, tHash)
}
52 changes: 20 additions & 32 deletions lib/babe/babe.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/crypto/sr25519"
"github.com/ChainSafe/gossamer/lib/runtime"
rtstorage "github.com/ChainSafe/gossamer/lib/runtime/storage"
log "github.com/ChainSafe/log15"
)

Expand Down Expand Up @@ -66,7 +65,7 @@ type Service struct {
blockChan chan types.Block // send blocks to core service

// State variables
lock sync.Mutex
sync.RWMutex
pause chan struct{}
}

Expand Down Expand Up @@ -231,20 +230,18 @@ func (b *Service) Pause() error {
return errors.New("service already paused")
}

select {
case b.pause <- struct{}{}:
logger.Info("service paused")
default:
}
b.Lock()
defer b.Unlock()

b.pause <- struct{}{}
b.paused = true
return nil
}

// Resume resumes the service ie. resumes block production
func (b *Service) Resume() error {
if !b.paused {
return errors.New("service not paused")
return nil
}

epoch, err := b.epochState.GetCurrentEpoch()
Expand All @@ -253,16 +250,19 @@ func (b *Service) Resume() error {
return err
}

go b.initiate(epoch)
b.Lock()
defer b.Unlock()

b.paused = false
logger.Info("service resumed")
go b.initiate(epoch)
logger.Info("service resumed", "epoch", epoch)
return nil
}

// Stop stops the service. If stop is called, it cannot be resumed.
func (b *Service) Stop() error {
b.lock.Lock()
defer b.lock.Unlock()
b.Lock()
defer b.Unlock()

if b.ctx.Err() != nil {
return errors.New("service already stopped")
Expand All @@ -285,7 +285,7 @@ func (b *Service) GetBlockChannel() <-chan types.Block {

// SetOnDisabled sets the block producer with the given index as disabled
// If this is our node, we stop producing blocks
func (b *Service) SetOnDisabled(authorityIndex uint32) {
func (b *Service) SetOnDisabled(authorityIndex uint32) { // TODO: remove this
if authorityIndex == b.epochData.authorityIndex {
b.isDisabled = true
}
Expand All @@ -303,18 +303,14 @@ func (b *Service) IsStopped() bool {

// IsPaused returns if the service is paused or not (ie. producing blocks)
func (b *Service) IsPaused() bool {
b.RLock()
defer b.RUnlock()
return b.paused
}

func (b *Service) safeSend(msg types.Block) error {
defer func() {
if err := recover(); err != nil {
logger.Error("recovered from panic", "error", err)
}
}()

b.lock.Lock()
defer b.lock.Unlock()
b.Lock()
defer b.Unlock()

if b.IsStopped() {
return errors.New("Service has been stopped")
Expand Down Expand Up @@ -445,7 +441,7 @@ func (b *Service) invokeBlockAuthoring(epoch uint64) {
}

func (b *Service) handleSlot(slotNum uint64) error {
if b.slotToProof[slotNum] == nil {
if _, has := b.slotToProof[slotNum]; !has {
return ErrNotAuthorized
}

Expand Down Expand Up @@ -488,21 +484,13 @@ func (b *Service) handleSlot(slotNum uint64) error {
return nil
}

old := ts.Snapshot()

// block built successfully, store resulting trie in storage state
oldTs, err := rtstorage.NewTrieState(old)
if err != nil {
return err
}

err = b.storageState.StoreTrie(oldTs)
err = b.storageState.StoreTrie(ts)
if err != nil {
logger.Error("failed to store trie in storage state", "error", err)
}

hash := block.Header.Hash()
logger.Info("built block", "hash", hash.String(), "number", block.Header.Number, "slot", slotNum)
logger.Info("built block", "hash", hash.String(), "number", block.Header.Number, "state root", block.Header.StateRoot, "slot", slotNum)
logger.Debug("built block", "header", block.Header, "body", block.Body, "parent", parent.Hash())

err = b.blockState.AddBlock(block)
Expand Down
8 changes: 2 additions & 6 deletions lib/babe/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,11 @@ func (b *Service) buildBlock(parent *types.Header, slot Slot) (*types.Block, err
b.slotToProof,
b.epochData.authorityIndex,
)

if err != nil {
return nil, errors.New("There was an error creating block builder - " + err.Error())
return nil, fmt.Errorf("failed to create block builder: %w", err)
}

block, err := builder.buildBlock(parent, slot)

return block, err

return builder.buildBlock(parent, slot)
}

// nolint
Expand Down
5 changes: 3 additions & 2 deletions lib/runtime/storage/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *TrieState) Trie() *trie.Trie {

// Snapshot creates a new "version" of the trie. The trie before Snapshot is called
// can no longer be modified, all further changes are on a new "version" of the trie.
// It returns the previous version of the trie.
// It returns the new version of the trie.
func (s *TrieState) Snapshot() *trie.Trie {
return s.t.Snapshot()
}
Expand All @@ -61,7 +61,8 @@ func (s *TrieState) Snapshot() *trie.Trie {
func (s *TrieState) BeginStorageTransaction() {
s.lock.Lock()
defer s.lock.Unlock()
s.oldTrie = s.t.Snapshot()
s.oldTrie = s.t
s.t = s.t.Snapshot()
}

// CommitStorageTransaction commits all storage changes made since BeginStorageTransaction was called.
Expand Down
7 changes: 3 additions & 4 deletions lib/trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ func NewTrie(root node) *Trie {

// Snapshot created a copy of the trie.
func (t *Trie) Snapshot() *Trie {
oldTrie := &Trie{
generation: t.generation,
newTrie := &Trie{
generation: t.generation + 1,
root: t.root,
childTries: t.childTries,
}
t.generation++
return oldTrie
return newTrie
}

func (t *Trie) maybeUpdateLeafGeneration(n *leaf) *leaf {
Expand Down
Loading

0 comments on commit ca99fbf

Please sign in to comment.