Skip to content

Commit

Permalink
feat(lib/grandpa): send NeighbourMessage to peers (ChainSafe#1558)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored May 5, 2021
1 parent e749a8d commit 322ccf9
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 39 deletions.
10 changes: 5 additions & 5 deletions dot/core/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type DigestHandler struct {
// block notification channels
imported chan *types.Block
importedID byte
finalised chan *types.Header
finalised chan *types.FinalisationInfo
finalisedID byte

// GRANDPA changes
Expand All @@ -68,7 +68,7 @@ type resume struct {
// NewDigestHandler returns a new DigestHandler
func NewDigestHandler(blockState BlockState, epochState EpochState, grandpaState GrandpaState, babe BlockProducer, verifier Verifier) (*DigestHandler, error) {
imported := make(chan *types.Block, 16)
finalised := make(chan *types.Header, 16)
finalised := make(chan *types.FinalisationInfo, 16)
iid, err := blockState.RegisterImportedChannel(imported)
if err != nil {
return nil, err
Expand Down Expand Up @@ -195,12 +195,12 @@ func (h *DigestHandler) handleBlockImport(ctx context.Context) {
func (h *DigestHandler) handleBlockFinalisation(ctx context.Context) {
for {
select {
case header := <-h.finalised:
if header == nil {
case info := <-h.finalised:
if info == nil || info.Header == nil {
continue
}

err := h.handleGrandpaChangesOnFinalization(header.Number)
err := h.handleGrandpaChangesOnFinalization(info.Header.Number)
if err != nil {
logger.Error("failed to handle grandpa changes on block finalisation", "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type BlockState interface {
SetFinalizedHash(common.Hash, uint64, uint64) error
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
SubChain(start, end common.Hash) ([]common.Hash, error)
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type BlockAPI interface {
GetJustification(hash common.Hash) ([]byte, error)
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
SubChain(start, end common.Hash) ([]common.Hash, error)
}
Expand Down
16 changes: 8 additions & 8 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,19 @@ func (l *BlockListener) Listen() {

// BlockFinalizedListener to handle listening for finalised blocks
type BlockFinalizedListener struct {
channel chan *types.Header
channel chan *types.FinalisationInfo
wsconn WSConnAPI
chanID byte
subID uint
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockFinalizedListener) Listen() {
for header := range l.channel {
if header == nil {
for info := range l.channel {
if info == nil || info.Header == nil {
continue
}
head, err := modules.HeaderToJSON(*header)
head, err := modules.HeaderToJSON(*info.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}
Expand All @@ -147,7 +147,7 @@ type ExtrinsicSubmitListener struct {
importedChan chan *types.Block
importedChanID byte
importedHash common.Hash
finalisedChan chan *types.Header
finalisedChan chan *types.FinalisationInfo
finalisedChanID byte
}

Expand Down Expand Up @@ -180,10 +180,10 @@ func (l *ExtrinsicSubmitListener) Listen() {

// listen for finalised headers
go func() {
for header := range l.finalisedChan {
if reflect.DeepEqual(l.importedHash, header.Hash()) {
for info := range l.finalisedChan {
if reflect.DeepEqual(l.importedHash, info.Header.Hash()) {
resM := make(map[string]interface{})
resM["finalised"] = header.Hash().String()
resM["finalised"] = info.Header.Hash().String()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
}
}
Expand Down
12 changes: 8 additions & 4 deletions dot/rpc/subscription/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBlockListener_Listen(t *testing.T) {
}

func TestBlockFinalizedListener_Listen(t *testing.T) {
notifyChan := make(chan *types.Header)
notifyChan := make(chan *types.FinalisationInfo)
mockConnection := &MockWSConnAPI{}
bfl := BlockFinalizedListener{
channel: notifyChan,
Expand All @@ -113,14 +113,16 @@ func TestBlockFinalizedListener_Listen(t *testing.T) {

go bfl.Listen()

notifyChan <- header
notifyChan <- &types.FinalisationInfo{
Header: header,
}
time.Sleep(time.Millisecond * 10)
require.Equal(t, expectedResponse, mockConnection.lastMessage)
}

func TestExtrinsicSubmitListener_Listen(t *testing.T) {
notifyImportedChan := make(chan *types.Block)
notifyFinalizedChan := make(chan *types.Header)
notifyFinalizedChan := make(chan *types.FinalisationInfo)

mockConnection := &MockWSConnAPI{}
esl := ExtrinsicSubmitListener{
Expand Down Expand Up @@ -149,7 +151,9 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {
time.Sleep(time.Millisecond * 10)
require.Equal(t, expectedImportedRespones, mockConnection.lastMessage)

notifyFinalizedChan <- header
notifyFinalizedChan <- &types.FinalisationInfo{
Header: header,
}
time.Sleep(time.Millisecond * 10)
resFinalised := map[string]interface{}{"finalised": block.Header.Hash().String()}
expectedFinalizedRespones := newSubscriptionResponse(AuthorExtrinsicUpdates, esl.subID, resFinalised)
Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (c *WSConn) initBlockListener(reqID float64) (uint, error) {

func (c *WSConn) initBlockFinalizedListener(reqID float64) (uint, error) {
bfl := &BlockFinalizedListener{
channel: make(chan *types.Header),
channel: make(chan *types.FinalisationInfo),
wsconn: c,
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (uint, er
importedChan: make(chan *types.Block),
wsconn: c,
extrinsic: types.Extrinsic(extBytes),
finalisedChan: make(chan *types.Header),
finalisedChan: make(chan *types.FinalisationInfo),
}

if c.BlockAPI == nil {
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/subscription/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
}
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
}
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
return 0, nil
}
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
}
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
}
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
return 0, nil
}
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}
Expand Down
8 changes: 4 additions & 4 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type BlockState struct {

// block notifiers
imported map[byte]chan<- *types.Block
finalised map[byte]chan<- *types.Header
finalised map[byte]chan<- *types.FinalisationInfo
importedLock sync.RWMutex
finalisedLock sync.RWMutex

Expand All @@ -65,7 +65,7 @@ func NewBlockState(db chaindb.Database, bt *blocktree.BlockTree) (*BlockState, e
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
imported: make(map[byte]chan<- *types.Block),
finalised: make(map[byte]chan<- *types.Header),
finalised: make(map[byte]chan<- *types.FinalisationInfo),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
}

Expand All @@ -85,7 +85,7 @@ func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header) (*Block
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
imported: make(map[byte]chan<- *types.Block),
finalised: make(map[byte]chan<- *types.Header),
finalised: make(map[byte]chan<- *types.FinalisationInfo),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
}

Expand Down Expand Up @@ -424,7 +424,7 @@ func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) er
bs.Lock()
defer bs.Unlock()

go bs.notifyFinalized(hash)
go bs.notifyFinalized(hash, round, setID)
if round > 0 {
err := bs.SetRound(round)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions dot/state/block_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, err

// RegisterFinalizedChannel registers a channel for block notification upon block finalisation.
// It returns the channel ID (used for unregistering the channel)
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
bs.finalisedLock.RLock()

if len(bs.finalised) == 256 {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (bs *BlockState) notifyImported(block *types.Block) {
}
}

func (bs *BlockState) notifyFinalized(hash common.Hash) {
func (bs *BlockState) notifyFinalized(hash common.Hash, round, setID uint64) {
bs.finalisedLock.RLock()
defer bs.finalisedLock.RUnlock()

Expand All @@ -126,11 +126,16 @@ func (bs *BlockState) notifyFinalized(hash common.Hash) {
}

logger.Debug("notifying finalised block chans...", "chans", bs.finalised)
info := &types.FinalisationInfo{
Header: header,
Round: round,
SetID: setID,
}

for _, ch := range bs.finalised {
go func(ch chan<- *types.Header) {
go func(ch chan<- *types.FinalisationInfo) {
select {
case ch <- header:
case ch <- info:
default:
}
}(ch)
Expand Down
8 changes: 4 additions & 4 deletions dot/state/block_notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestImportChannel(t *testing.T) {
func TestFinalizedChannel(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)

ch := make(chan *types.Header, 3)
ch := make(chan *types.FinalisationInfo, 3)
id, err := bs.RegisterFinalizedChannel(ch)
require.NoError(t, err)

Expand Down Expand Up @@ -117,12 +117,12 @@ func TestFinalizedChannel_Multi(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)

num := 5
chs := make([]chan *types.Header, num)
chs := make([]chan *types.FinalisationInfo, num)
ids := make([]byte, num)

var err error
for i := 0; i < num; i++ {
chs[i] = make(chan *types.Header)
chs[i] = make(chan *types.FinalisationInfo)
ids[i], err = bs.RegisterFinalizedChannel(chs[i])
require.NoError(t, err)
}
Expand All @@ -134,7 +134,7 @@ func TestFinalizedChannel_Multi(t *testing.T) {

for i, ch := range chs {

go func(i int, ch chan *types.Header) {
go func(i int, ch chan *types.FinalisationInfo) {
select {
case <-ch:
case <-time.After(testMessageTimeout):
Expand Down
7 changes: 7 additions & 0 deletions dot/types/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,10 @@ func DecodeGrandpaVoters(r io.Reader) (GrandpaVoters, error) {

return voters, nil
}

// FinalisationInfo represents information about what block was finalised in what round and setID
type FinalisationInfo struct {
Header *Header
Round uint64
SetID uint64
}
17 changes: 16 additions & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ type Service struct {
justification map[uint64][]*SignedPrecommit // map of round number -> precommit round justification

// channels for communication with other services
in chan GrandpaMessage // only used to receive *VoteMessage
in chan GrandpaMessage // only used to receive *VoteMessage
finalisedCh chan *types.FinalisationInfo
finalisedChID byte
neighbourMessage *NeighbourMessage // cached neighbour message
}

// Config represents a GRANDPA service configuration
Expand Down Expand Up @@ -133,6 +136,12 @@ func NewService(cfg *Config) (*Service, error) {
return nil, err
}

finalisedCh := make(chan *types.FinalisationInfo, 16)
fid, err := cfg.BlockState.RegisterFinalizedChannel(finalisedCh)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
s := &Service{
ctx: ctx,
Expand All @@ -156,6 +165,8 @@ func NewService(cfg *Config) (*Service, error) {
in: make(chan GrandpaMessage, 128),
resumed: make(chan struct{}),
network: cfg.Network,
finalisedCh: finalisedCh,
finalisedChID: fid,
}

s.messageHandler = NewMessageHandler(s, s.blockState)
Expand Down Expand Up @@ -185,6 +196,7 @@ func (s *Service) Start() error {
}
}()

go s.sendNeighbourMessage()
return nil
}

Expand All @@ -195,6 +207,9 @@ func (s *Service) Stop() error {

s.cancel()

s.blockState.UnregisterFinalizedChannel(s.finalisedChID)
close(s.finalisedCh)

if !s.authority {
return nil
}
Expand Down
32 changes: 30 additions & 2 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package grandpa

import (
"fmt"
"time"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/lib/common"
Expand All @@ -28,8 +29,9 @@ import (
)

var (
grandpaID protocol.ID = "/paritytech/grandpa/1"
messageID = network.ConsensusMsgType
grandpaID protocol.ID = "/paritytech/grandpa/1"
messageID = network.ConsensusMsgType
neighbourMessageInterval = time.Minute * 5
)

// Handshake is an alias for network.Handshake
Expand Down Expand Up @@ -160,3 +162,29 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) (

return true, nil
}

func (s *Service) sendNeighbourMessage() {
for {
select {
case <-time.After(neighbourMessageInterval):
if s.neighbourMessage == nil {
continue
}
case info := <-s.finalisedCh:
s.neighbourMessage = &NeighbourMessage{
Version: 1,
Round: info.Round,
SetID: info.SetID,
Number: uint32(info.Header.Number.Int64()),
}
}

cm, err := s.neighbourMessage.ToConsensusMessage()
if err != nil {
logger.Warn("failed to convert NeighbourMessage to network message", "error", err)
continue
}

s.network.SendMessage(cm)
}
}
Loading

0 comments on commit 322ccf9

Please sign in to comment.