Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lib/grandpa): send NeighbourMessage to peers #1558

Merged
merged 8 commits into from
May 5, 2021
Merged
10 changes: 5 additions & 5 deletions dot/core/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type DigestHandler struct {
// block notification channels
imported chan *types.Block
importedID byte
finalised chan *types.Header
finalised chan *types.FinalisationInfo
finalisedID byte

// GRANDPA changes
Expand All @@ -68,7 +68,7 @@ type resume struct {
// NewDigestHandler returns a new DigestHandler
func NewDigestHandler(blockState BlockState, epochState EpochState, grandpaState GrandpaState, babe BlockProducer, verifier Verifier) (*DigestHandler, error) {
imported := make(chan *types.Block, 16)
finalised := make(chan *types.Header, 16)
finalised := make(chan *types.FinalisationInfo, 16)
iid, err := blockState.RegisterImportedChannel(imported)
if err != nil {
return nil, err
Expand Down Expand Up @@ -195,12 +195,12 @@ func (h *DigestHandler) handleBlockImport(ctx context.Context) {
func (h *DigestHandler) handleBlockFinalisation(ctx context.Context) {
for {
select {
case header := <-h.finalised:
if header == nil {
case info := <-h.finalised:
if info == nil || info.Header == nil {
continue
}

err := h.handleGrandpaChangesOnFinalization(header.Number)
err := h.handleGrandpaChangesOnFinalization(info.Header.Number)
if err != nil {
logger.Error("failed to handle grandpa changes on block finalisation", "error", err)
}
Expand Down
2 changes: 1 addition & 1 deletion dot/core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type BlockState interface {
SetFinalizedHash(common.Hash, uint64, uint64) error
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
HighestCommonAncestor(a, b common.Hash) (common.Hash, error)
SubChain(start, end common.Hash) ([]common.Hash, error)
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/modules/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type BlockAPI interface {
GetJustification(hash common.Hash) ([]byte, error)
RegisterImportedChannel(ch chan<- *types.Block) (byte, error)
UnregisterImportedChannel(id byte)
RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error)
RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error)
UnregisterFinalizedChannel(id byte)
SubChain(start, end common.Hash) ([]common.Hash, error)
}
Expand Down
16 changes: 8 additions & 8 deletions dot/rpc/subscription/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,19 @@ func (l *BlockListener) Listen() {

// BlockFinalizedListener to handle listening for finalised blocks
type BlockFinalizedListener struct {
channel chan *types.Header
channel chan *types.FinalisationInfo
wsconn WSConnAPI
chanID byte
subID uint
}

// Listen implementation of Listen interface to listen for importedChan changes
func (l *BlockFinalizedListener) Listen() {
for header := range l.channel {
if header == nil {
for info := range l.channel {
if info == nil || info.Header == nil {
continue
}
head, err := modules.HeaderToJSON(*header)
head, err := modules.HeaderToJSON(*info.Header)
if err != nil {
logger.Error("failed to convert header to JSON", "error", err)
}
Expand All @@ -147,7 +147,7 @@ type ExtrinsicSubmitListener struct {
importedChan chan *types.Block
importedChanID byte
importedHash common.Hash
finalisedChan chan *types.Header
finalisedChan chan *types.FinalisationInfo
finalisedChanID byte
}

Expand Down Expand Up @@ -180,10 +180,10 @@ func (l *ExtrinsicSubmitListener) Listen() {

// listen for finalised headers
go func() {
for header := range l.finalisedChan {
if reflect.DeepEqual(l.importedHash, header.Hash()) {
for info := range l.finalisedChan {
if reflect.DeepEqual(l.importedHash, info.Header.Hash()) {
resM := make(map[string]interface{})
resM["finalised"] = header.Hash().String()
resM["finalised"] = info.Header.Hash().String()
l.wsconn.safeSend(newSubscriptionResponse(AuthorExtrinsicUpdates, l.subID, resM))
}
}
Expand Down
12 changes: 8 additions & 4 deletions dot/rpc/subscription/listeners_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestBlockListener_Listen(t *testing.T) {
}

func TestBlockFinalizedListener_Listen(t *testing.T) {
notifyChan := make(chan *types.Header)
notifyChan := make(chan *types.FinalisationInfo)
mockConnection := &MockWSConnAPI{}
bfl := BlockFinalizedListener{
channel: notifyChan,
Expand All @@ -113,14 +113,16 @@ func TestBlockFinalizedListener_Listen(t *testing.T) {

go bfl.Listen()

notifyChan <- header
notifyChan <- &types.FinalisationInfo{
Header: header,
}
time.Sleep(time.Millisecond * 10)
require.Equal(t, expectedResponse, mockConnection.lastMessage)
}

func TestExtrinsicSubmitListener_Listen(t *testing.T) {
notifyImportedChan := make(chan *types.Block)
notifyFinalizedChan := make(chan *types.Header)
notifyFinalizedChan := make(chan *types.FinalisationInfo)

mockConnection := &MockWSConnAPI{}
esl := ExtrinsicSubmitListener{
Expand Down Expand Up @@ -149,7 +151,9 @@ func TestExtrinsicSubmitListener_Listen(t *testing.T) {
time.Sleep(time.Millisecond * 10)
require.Equal(t, expectedImportedRespones, mockConnection.lastMessage)

notifyFinalizedChan <- header
notifyFinalizedChan <- &types.FinalisationInfo{
Header: header,
}
time.Sleep(time.Millisecond * 10)
resFinalised := map[string]interface{}{"finalised": block.Header.Hash().String()}
expectedFinalizedRespones := newSubscriptionResponse(AuthorExtrinsicUpdates, esl.subID, resFinalised)
Expand Down
4 changes: 2 additions & 2 deletions dot/rpc/subscription/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (c *WSConn) initBlockListener(reqID float64) (uint, error) {

func (c *WSConn) initBlockFinalizedListener(reqID float64) (uint, error) {
bfl := &BlockFinalizedListener{
channel: make(chan *types.Header),
channel: make(chan *types.FinalisationInfo),
wsconn: c,
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func (c *WSConn) initExtrinsicWatch(reqID float64, params interface{}) (uint, er
importedChan: make(chan *types.Block),
wsconn: c,
extrinsic: types.Extrinsic(extBytes),
finalisedChan: make(chan *types.Header),
finalisedChan: make(chan *types.FinalisationInfo),
}

if c.BlockAPI == nil {
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/subscription/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
}
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
}
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
return 0, nil
}
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}
Expand Down
2 changes: 1 addition & 1 deletion dot/rpc/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (m *MockBlockAPI) RegisterImportedChannel(ch chan<- *types.Block) (byte, er
}
func (m *MockBlockAPI) UnregisterImportedChannel(id byte) {
}
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
func (m *MockBlockAPI) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
return 0, nil
}
func (m *MockBlockAPI) UnregisterFinalizedChannel(id byte) {}
Expand Down
8 changes: 4 additions & 4 deletions dot/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type BlockState struct {

// block notifiers
imported map[byte]chan<- *types.Block
finalised map[byte]chan<- *types.Header
finalised map[byte]chan<- *types.FinalisationInfo
importedLock sync.RWMutex
finalisedLock sync.RWMutex

Expand All @@ -65,7 +65,7 @@ func NewBlockState(db chaindb.Database, bt *blocktree.BlockTree) (*BlockState, e
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
imported: make(map[byte]chan<- *types.Block),
finalised: make(map[byte]chan<- *types.Header),
finalised: make(map[byte]chan<- *types.FinalisationInfo),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
}

Expand All @@ -85,7 +85,7 @@ func NewBlockStateFromGenesis(db chaindb.Database, header *types.Header) (*Block
baseState: NewBaseState(db),
db: chaindb.NewTable(db, blockPrefix),
imported: make(map[byte]chan<- *types.Block),
finalised: make(map[byte]chan<- *types.Header),
finalised: make(map[byte]chan<- *types.FinalisationInfo),
pruneKeyCh: make(chan *types.Header, pruneKeyBufferSize),
}

Expand Down Expand Up @@ -424,7 +424,7 @@ func (bs *BlockState) SetFinalizedHash(hash common.Hash, round, setID uint64) er
bs.Lock()
defer bs.Unlock()

go bs.notifyFinalized(hash)
go bs.notifyFinalized(hash, round, setID)
if round > 0 {
err := bs.SetRound(round)
if err != nil {
Expand Down
13 changes: 9 additions & 4 deletions dot/state/block_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (bs *BlockState) RegisterImportedChannel(ch chan<- *types.Block) (byte, err

// RegisterFinalizedChannel registers a channel for block notification upon block finalisation.
// It returns the channel ID (used for unregistering the channel)
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.Header) (byte, error) {
func (bs *BlockState) RegisterFinalizedChannel(ch chan<- *types.FinalisationInfo) (byte, error) {
bs.finalisedLock.RLock()

if len(bs.finalised) == 256 {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (bs *BlockState) notifyImported(block *types.Block) {
}
}

func (bs *BlockState) notifyFinalized(hash common.Hash) {
func (bs *BlockState) notifyFinalized(hash common.Hash, round, setID uint64) {
bs.finalisedLock.RLock()
defer bs.finalisedLock.RUnlock()

Expand All @@ -126,11 +126,16 @@ func (bs *BlockState) notifyFinalized(hash common.Hash) {
}

logger.Debug("notifying finalised block chans...", "chans", bs.finalised)
info := &types.FinalisationInfo{
Header: header,
Round: round,
SetID: setID,
}

for _, ch := range bs.finalised {
go func(ch chan<- *types.Header) {
go func(ch chan<- *types.FinalisationInfo) {
select {
case ch <- header:
case ch <- info:
default:
}
}(ch)
Expand Down
8 changes: 4 additions & 4 deletions dot/state/block_notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestImportChannel(t *testing.T) {
func TestFinalizedChannel(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)

ch := make(chan *types.Header, 3)
ch := make(chan *types.FinalisationInfo, 3)
id, err := bs.RegisterFinalizedChannel(ch)
require.NoError(t, err)

Expand Down Expand Up @@ -117,12 +117,12 @@ func TestFinalizedChannel_Multi(t *testing.T) {
bs := newTestBlockState(t, testGenesisHeader)

num := 5
chs := make([]chan *types.Header, num)
chs := make([]chan *types.FinalisationInfo, num)
ids := make([]byte, num)

var err error
for i := 0; i < num; i++ {
chs[i] = make(chan *types.Header)
chs[i] = make(chan *types.FinalisationInfo)
ids[i], err = bs.RegisterFinalizedChannel(chs[i])
require.NoError(t, err)
}
Expand All @@ -134,7 +134,7 @@ func TestFinalizedChannel_Multi(t *testing.T) {

for i, ch := range chs {

go func(i int, ch chan *types.Header) {
go func(i int, ch chan *types.FinalisationInfo) {
select {
case <-ch:
case <-time.After(testMessageTimeout):
Expand Down
7 changes: 7 additions & 0 deletions dot/types/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,10 @@ func DecodeGrandpaVoters(r io.Reader) (GrandpaVoters, error) {

return voters, nil
}

// FinalisationInfo represents information about what block was finalised in what round and setID
type FinalisationInfo struct {
Header *Header
Round uint64
SetID uint64
}
17 changes: 16 additions & 1 deletion lib/grandpa/grandpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ type Service struct {
justification map[uint64][]*SignedPrecommit // map of round number -> precommit round justification

// channels for communication with other services
in chan GrandpaMessage // only used to receive *VoteMessage
in chan GrandpaMessage // only used to receive *VoteMessage
finalisedCh chan *types.FinalisationInfo
finalisedChID byte
neighbourMessage *NeighbourMessage // cached neighbour message
}

// Config represents a GRANDPA service configuration
Expand Down Expand Up @@ -133,6 +136,12 @@ func NewService(cfg *Config) (*Service, error) {
return nil, err
}

finalisedCh := make(chan *types.FinalisationInfo, 16)
fid, err := cfg.BlockState.RegisterFinalizedChannel(finalisedCh)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
s := &Service{
ctx: ctx,
Expand All @@ -156,6 +165,8 @@ func NewService(cfg *Config) (*Service, error) {
in: make(chan GrandpaMessage, 128),
resumed: make(chan struct{}),
network: cfg.Network,
finalisedCh: finalisedCh,
finalisedChID: fid,
}

s.messageHandler = NewMessageHandler(s, s.blockState)
Expand Down Expand Up @@ -185,6 +196,7 @@ func (s *Service) Start() error {
}
}()

go s.sendNeighbourMessage()
return nil
}

Expand All @@ -195,6 +207,9 @@ func (s *Service) Stop() error {

s.cancel()

s.blockState.UnregisterFinalizedChannel(s.finalisedChID)
close(s.finalisedCh)

if !s.authority {
return nil
}
Expand Down
32 changes: 30 additions & 2 deletions lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package grandpa

import (
"fmt"
"time"

"github.com/ChainSafe/gossamer/dot/network"
"github.com/ChainSafe/gossamer/lib/common"
Expand All @@ -28,8 +29,9 @@ import (
)

var (
grandpaID protocol.ID = "/paritytech/grandpa/1"
messageID = network.ConsensusMsgType
grandpaID protocol.ID = "/paritytech/grandpa/1"
messageID = network.ConsensusMsgType
neighbourMessageInterval = time.Minute * 5
)

// Handshake is an alias for network.Handshake
Expand Down Expand Up @@ -141,3 +143,29 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) e

return nil
}

func (s *Service) sendNeighbourMessage() {
for {
select {
case <-time.After(neighbourMessageInterval):
if s.neighbourMessage == nil {
continue
}
case info := <-s.finalisedCh:
s.neighbourMessage = &NeighbourMessage{
Version: 1,
Round: info.Round,
SetID: info.SetID,
Number: uint32(info.Header.Number.Int64()),
}
}

cm, err := s.neighbourMessage.ToConsensusMessage()
if err != nil {
logger.Warn("failed to convert NeighbourMessage to network message", "error", err)
continue
}

s.network.SendMessage(cm)
}
}
Loading