Skip to content

Commit

Permalink
maintenance(dot/sync): add clearing of pending blocks set (ChainSafe#…
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored Nov 26, 2021
1 parent dd08424 commit 88685cc
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 35 deletions.
16 changes: 15 additions & 1 deletion dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ type chainSync struct {
// disjoint set of blocks which are known but not ready to be processed
// ie. we only know the hash, number, or the parent block is unknown, or the body is unknown
// note: the block may have empty fields, as some data about it may be unknown
pendingBlocks DisjointBlockSet
pendingBlocks DisjointBlockSet
pendingBlockDoneCh chan<- struct{}

// bootstrap or tip (near-head)
state chainSyncState
Expand Down Expand Up @@ -192,11 +193,17 @@ func (cs *chainSync) start() {
time.Sleep(time.Millisecond * 100)
}

pendingBlockDoneCh := make(chan struct{})
cs.pendingBlockDoneCh = pendingBlockDoneCh
go cs.pendingBlocks.run(pendingBlockDoneCh)
go cs.sync()
go cs.logSyncSpeed()
}

func (cs *chainSync) stop() {
if cs.pendingBlockDoneCh != nil {
close(cs.pendingBlockDoneCh)
}
cs.cancel()
}

Expand Down Expand Up @@ -439,6 +446,13 @@ func (cs *chainSync) sync() {
logger.Debugf(
"discarding worker id %d: maximum retry count reached",
worker.id)

// if this worker was triggered due to a block in the pending blocks set,
// we want to remove it from the set, as we asked all our peers for it
// and none replied with the info we need.
if worker.pendingBlock != nil {
cs.pendingBlocks.removeBlock(worker.pendingBlock.hash)
}
continue
}

Expand Down
88 changes: 67 additions & 21 deletions dot/sync/disjoint_block_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ import (
"errors"
"math/big"
"sync"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
)

const (
// ttl is the time that a block can stay in this set before being cleared.
ttl = 10 * time.Minute
clearBlocksInterval = time.Minute
)

var (
errUnknownBlock = errors.New("cannot add justification for unknown block")
errSetAtLimit = errors.New("cannot add block; set is at capacity")
Expand All @@ -20,6 +27,7 @@ var (
// DisjointBlockSet represents a set of incomplete blocks, or blocks
// with an unknown parent. it is implemented by *disjointBlockSet
type DisjointBlockSet interface {
run(done <-chan struct{})
addHashAndNumber(common.Hash, *big.Int) error
addHeader(*types.Header) error
addBlock(*types.Block) error
Expand All @@ -44,6 +52,21 @@ type pendingBlock struct {
header *types.Header
body *types.Body
justification []byte

// the time when this block should be cleared from the set.
// if the block is re-added to the set, this time get updated.
clearAt time.Time
}

func newPendingBlock(hash common.Hash, number *big.Int,
header *types.Header, body *types.Body, clearAt time.Time) *pendingBlock {
return &pendingBlock{
hash: hash,
number: number,
header: header,
body: body,
clearAt: clearAt,
}
}

func (b *pendingBlock) toBlockData() *types.BlockData {
Expand Down Expand Up @@ -80,13 +103,41 @@ type disjointBlockSet struct {

// map of parent hash -> child hashes
parentToChildren map[common.Hash]map[common.Hash]struct{}

timeNow func() time.Time
}

func newDisjointBlockSet(limit int) *disjointBlockSet {
return &disjointBlockSet{
blocks: make(map[common.Hash]*pendingBlock),
parentToChildren: make(map[common.Hash]map[common.Hash]struct{}),
limit: limit,
timeNow: time.Now,
}
}

func (s *disjointBlockSet) run(done <-chan struct{}) {
ticker := time.NewTicker(clearBlocksInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
s.clearBlocks()
case <-done:
return
}
}
}

func (s *disjointBlockSet) clearBlocks() {
s.Lock()
defer s.Unlock()

for _, block := range s.blocks {
if s.timeNow().Sub(block.clearAt) > 0 {
s.removeBlockInner(block.hash)
}
}
}

Expand All @@ -104,19 +155,16 @@ func (s *disjointBlockSet) addHashAndNumber(hash common.Hash, number *big.Int) e
s.Lock()
defer s.Unlock()

if _, has := s.blocks[hash]; has {
if b, has := s.blocks[hash]; has {
b.clearAt = s.timeNow().Add(ttl)
return nil
}

if len(s.blocks) == s.limit {
return errSetAtLimit
}

s.blocks[hash] = &pendingBlock{
hash: hash,
number: number,
}

s.blocks[hash] = newPendingBlock(hash, number, nil, nil, s.timeNow().Add(ttl))
return nil
}

Expand All @@ -125,21 +173,17 @@ func (s *disjointBlockSet) addHeader(header *types.Header) error {
defer s.Unlock()

hash := header.Hash()
b, has := s.blocks[hash]
if has {
if b, has := s.blocks[hash]; has {
b.header = header
b.clearAt = s.timeNow().Add(ttl)
return nil
}

if len(s.blocks) == s.limit {
return errSetAtLimit
}

s.blocks[hash] = &pendingBlock{
hash: hash,
number: header.Number,
header: header,
}
s.blocks[hash] = newPendingBlock(hash, header.Number, header, nil, s.timeNow().Add(ttl))
s.addToParentMap(header.ParentHash, hash)
return nil
}
Expand All @@ -149,23 +193,18 @@ func (s *disjointBlockSet) addBlock(block *types.Block) error {
defer s.Unlock()

hash := block.Header.Hash()
b, has := s.blocks[hash]
if has {
if b, has := s.blocks[hash]; has {
b.header = &block.Header
b.body = &block.Body
b.clearAt = s.timeNow().Add(ttl)
return nil
}

if len(s.blocks) == s.limit {
return errSetAtLimit
}

s.blocks[hash] = &pendingBlock{
hash: hash,
number: block.Header.Number,
header: &block.Header,
body: &block.Body,
}
s.blocks[hash] = newPendingBlock(hash, block.Header.Number, &block.Header, &block.Body, s.timeNow().Add(ttl))
s.addToParentMap(block.Header.ParentHash, hash)
return nil
}
Expand All @@ -177,6 +216,7 @@ func (s *disjointBlockSet) addJustification(hash common.Hash, just []byte) error
b, has := s.blocks[hash]
if has {
b.justification = just
b.clearAt = time.Now().Add(ttl)
return nil
}

Expand All @@ -187,6 +227,12 @@ func (s *disjointBlockSet) addJustification(hash common.Hash, just []byte) error
func (s *disjointBlockSet) removeBlock(hash common.Hash) {
s.Lock()
defer s.Unlock()
s.removeBlockInner(hash)
}

// this function does not lock!!
// it should only be called by other functions in this file that lock the set beforehand.
func (s *disjointBlockSet) removeBlockInner(hash common.Hash) {
block, has := s.blocks[hash]
if !has {
return
Expand Down
54 changes: 42 additions & 12 deletions dot/sync/disjoint_block_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package sync
import (
"math/big"
"testing"
"time"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
Expand All @@ -15,6 +16,9 @@ import (

func TestDisjointBlockSet(t *testing.T) {
s := newDisjointBlockSet(pendingBlocksLimit)
s.timeNow = func() time.Time {
return time.Time{}
}

hash := common.Hash{0xa, 0xb}
number := big.NewInt(100)
Expand All @@ -23,8 +27,9 @@ func TestDisjointBlockSet(t *testing.T) {
require.Equal(t, 1, s.size())

expected := &pendingBlock{
hash: hash,
number: number,
hash: hash,
number: number,
clearAt: time.Time{}.Add(ttl),
}
blocks := s.getBlocks()
require.Equal(t, 1, len(blocks))
Expand All @@ -36,10 +41,12 @@ func TestDisjointBlockSet(t *testing.T) {
s.addHeader(header)
require.True(t, s.hasBlock(header.Hash()))
require.Equal(t, 2, s.size())

expected = &pendingBlock{
hash: header.Hash(),
number: header.Number,
header: header,
hash: header.Hash(),
number: header.Number,
header: header,
clearAt: time.Time{}.Add(ttl),
}
require.Equal(t, expected, s.getBlock(header.Hash()))

Expand All @@ -51,9 +58,10 @@ func TestDisjointBlockSet(t *testing.T) {
s.addHeader(header2)
require.Equal(t, 3, s.size())
expected = &pendingBlock{
hash: header2.Hash(),
number: header2.Number,
header: header2,
hash: header2.Hash(),
number: header2.Number,
header: header2,
clearAt: time.Time{}.Add(ttl),
}
require.Equal(t, expected, s.getBlock(header2.Hash()))

Expand All @@ -64,10 +72,11 @@ func TestDisjointBlockSet(t *testing.T) {
s.addBlock(block)
require.Equal(t, 3, s.size())
expected = &pendingBlock{
hash: header2.Hash(),
number: header2.Number,
header: header2,
body: &block.Body,
hash: header2.Hash(),
number: header2.Number,
header: header2,
body: &block.Body,
clearAt: time.Time{}.Add(ttl),
}
require.Equal(t, expected, s.getBlock(header2.Hash()))

Expand Down Expand Up @@ -195,3 +204,24 @@ func TestDisjointBlockSet_getReadyDescendants_blockNotComplete(t *testing.T) {
require.Equal(t, block1.ToBlockData(), ready[0])
require.Equal(t, block2.ToBlockData(), ready[1])
}

func TestDisjointBlockSet_ClearBlocks(t *testing.T) {
s := newDisjointBlockSet(pendingBlocksLimit)

testHashA := common.Hash{0}
testHashB := common.Hash{1}

s.blocks[testHashA] = &pendingBlock{
hash: testHashA,
clearAt: time.Unix(1000, 0),
}
s.blocks[testHashB] = &pendingBlock{
hash: testHashB,
clearAt: time.Now().Add(ttl * 2),
}

s.clearBlocks()
require.Equal(t, 1, len(s.blocks))
_, has := s.blocks[testHashB]
require.True(t, has)
}
3 changes: 3 additions & 0 deletions dot/sync/tip_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (s *tipSyncer) handleTick() ([]*worker, error) {
targetNumber: fin.Number,
direction: network.Descending,
requestData: bootstrapRequestData,
pendingBlock: block,
})
continue
}
Expand All @@ -193,6 +194,7 @@ func (s *tipSyncer) handleTick() ([]*worker, error) {
targetHash: block.hash,
targetNumber: block.number,
requestData: network.RequestedDataBody + network.RequestedDataJustification,
pendingBlock: block,
})
continue
}
Expand All @@ -217,6 +219,7 @@ func (s *tipSyncer) handleTick() ([]*worker, error) {
targetNumber: fin.Number,
direction: network.Descending,
requestData: bootstrapRequestData,
pendingBlock: block,
})
}

Expand Down
7 changes: 6 additions & 1 deletion dot/sync/tip_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func TestTipSyncer_handleTick_case1(t *testing.T) {

fin, _ := s.blockState.GetHighestFinalisedHeader()

// add pending blocks w/ only hash and number, lower than finalised should be removed
// add pending blocks w/ only hash and number, equal or lower than finalised should be removed
s.pendingBlocks.addHashAndNumber(common.Hash{0xa}, fin.Number)
s.pendingBlocks.addHashAndNumber(common.Hash{0xb}, big.NewInt(0).Add(fin.Number, big.NewInt(1)))

Expand All @@ -170,8 +170,10 @@ func TestTipSyncer_handleTick_case1(t *testing.T) {
targetNumber: fin.Number,
direction: network.Descending,
requestData: bootstrapRequestData,
pendingBlock: s.pendingBlocks.getBlock(common.Hash{0xb}),
},
}

w, err = s.handleTick()
require.NoError(t, err)
require.Equal(t, expected, w)
Expand All @@ -198,13 +200,15 @@ func TestTipSyncer_handleTick_case2(t *testing.T) {
targetNumber: header.Number,
direction: network.Ascending,
requestData: network.RequestedDataBody + network.RequestedDataJustification,
pendingBlock: s.pendingBlocks.getBlock(header.Hash()),
},
}
w, err := s.handleTick()
require.NoError(t, err)
require.Equal(t, expected, w)
require.True(t, s.pendingBlocks.hasBlock(header.Hash()))
}

func TestTipSyncer_handleTick_case3(t *testing.T) {
s := newTestTipSyncer(t)

Expand Down Expand Up @@ -248,6 +252,7 @@ func TestTipSyncer_handleTick_case3(t *testing.T) {
targetNumber: fin.Number,
direction: network.Descending,
requestData: bootstrapRequestData,
pendingBlock: s.pendingBlocks.getBlock(header.Hash()),
},
}

Expand Down
Loading

0 comments on commit 88685cc

Please sign in to comment.