Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): fix receiving notifications messages #1517

Merged
merged 19 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (bm *BlockAnnounceMessage) Type() byte {

// string formats a BlockAnnounceMessage as a string
func (bm *BlockAnnounceMessage) String() string {
return fmt.Sprintf("BlockAnnounceMessage ParentHash=%s Number=%d StateRoot=%sx ExtrinsicsRoot=%s Digest=%v",
return fmt.Sprintf("BlockAnnounceMessage ParentHash=%s Number=%d StateRoot=%s ExtrinsicsRoot=%s Digest=%v",
bm.ParentHash,
bm.Number,
bm.StateRoot,
Expand Down
23 changes: 17 additions & 6 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"math/rand"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -29,6 +30,10 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

var (
maxRetries = 12
)

// ConnManager implements connmgr.ConnManager
type ConnManager struct {
sync.Mutex
Expand Down Expand Up @@ -191,10 +196,18 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) {
Addrs: addrs,
}

err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
}
go func() {
for i := 0; i < maxRetries; i++ {
err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
time.Sleep(time.Minute)
continue
}

return
}
}()

// TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers
}
Expand All @@ -207,7 +220,6 @@ func (cm *ConnManager) registerDisconnectHandler(cb func(peer.ID)) {
func (cm *ConnManager) OpenedStream(n network.Network, s network.Stream) {
logger.Trace(
"Opened stream",
"host", s.Conn().LocalPeer(),
"peer", s.Conn().RemotePeer(),
"protocol", s.Protocol(),
)
Expand All @@ -221,7 +233,6 @@ func (cm *ConnManager) registerCloseHandler(protocolID protocol.ID, cb func(id p
func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) {
logger.Trace(
"Closed stream",
"host", s.Conn().LocalPeer(),
"peer", s.Conn().RemotePeer(),
"protocol", s.Protocol(),
)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestPersistentPeers(t *testing.T) {
require.NotEqual(t, 0, len(conns))

// if A disconnects from B, B should reconnect
nodeA.host.h.Network().ClosePeer(nodeA.host.id())
nodeA.host.h.Network().ClosePeer(nodeB.host.id())
time.Sleep(time.Millisecond * 500)
conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id())
require.NotEqual(t, 0, len(conns))
Expand Down
4 changes: 4 additions & 0 deletions dot/network/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ func (bm *BlockRequestMessage) Decode(in []byte) error {
case *pb.BlockRequest_Hash:
startingBlock, err = variadic.NewUint64OrHash(common.BytesToHash(from.Hash))
case *pb.BlockRequest_Number:
// TODO: we are receiving block requests w/ 4-byte From field; did the format change?
if len(from.Number) != 8 {
return errors.New("invalid BlockResponseMessage.From; uint64 is not 8 bytes")
}
startingBlock, err = variadic.NewUint64OrHash(binary.LittleEndian.Uint64(from.Number))
default:
err = errors.New("invalid StartingBlock")
Expand Down
25 changes: 13 additions & 12 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
err := handshakeValidator(peer, hs)
if err != nil {
logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Conn().Close()
return errCannotValidateHandshake
}

Expand All @@ -141,17 +140,17 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
// once validated, send back a handshake
resp, err := info.getHandshake()
if err != nil {
logger.Debug("failed to get handshake", "protocol", info.protocolID, "error", err)
logger.Warn("failed to get handshake", "protocol", info.protocolID, "error", err)
return err
}

err = s.host.send(peer, info.protocolID, resp)
err = s.host.writeToStream(stream, resp)
if err != nil {
logger.Trace("failed to send handshake", "protocol", info.protocolID, "peer", peer, "error", err)
_ = stream.Conn().Close()
return err
}
logger.Trace("receiver: sent handshake", "protocol", info.protocolID, "peer", peer)
return nil
}

// if we are the initiator and haven't received the handshake already, validate it
Expand All @@ -161,7 +160,6 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
if err != nil {
logger.Trace("failed to validate handshake", "protocol", info.protocolID, "peer", peer, "error", err)
hsData.validated = false
_ = stream.Conn().Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer close connection stream when there is an error, why? Is connection closing handled somewhere else? In network/service.go line 500, the stream is closed when there is an error. I'm just try to better understand how these streams are handled.

Copy link
Contributor Author

@noot noot Apr 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this was duplicate code, if this function returns an error then readStream in service.go will close the stream:

err = handler(stream, msg)
if err != nil {
	logger.Debug("failed to handle message from stream", "message", msg, "error", err)
	_ = stream.Close()
	return
}

return errCannotValidateHandshake
}

Expand All @@ -175,7 +173,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
// if we are the initiator, send the message
if hsData, has := info.getHandshakeData(peer); has && hsData.validated && hsData.received && hsData.outboundMsg != nil {
logger.Trace("sender: sending message", "protocol", info.protocolID)
err := s.host.send(peer, info.protocolID, hsData.outboundMsg)
err := s.host.writeToStream(stream, hsData.outboundMsg)
if err != nil {
logger.Debug("failed to send message", "protocol", info.protocolID, "peer", peer, "error", err)
return err
Expand All @@ -197,11 +195,14 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
}

// TODO: improve this by keeping track of who you've received/sent messages from
if !s.noGossip {
seen := s.gossip.hasSeen(msg)
if !seen {
s.broadcastExcluding(info, peer, msg)
}
if s.noGossip {
return nil
}

seen := s.gossip.hasSeen(msg)
if !seen {
// TODO: update this to write to stream w/ handshake established
s.broadcastExcluding(info, peer, msg)
}

return nil
Expand Down Expand Up @@ -261,7 +262,7 @@ func (s *Service) broadcastExcluding(info *notificationsProtocol, excluding peer
}

if err != nil {
logger.Error("failed to send message to peer", "peer", peer, "error", err)
logger.Debug("failed to send message to peer", "peer", peer, "error", err)
}
}
}
55 changes: 10 additions & 45 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewService(cfg *Config) (*Service, error) {
}

network.syncQueue = newSyncQueue(network)

network.noGossip = true // TODO: remove once duplicate message sending is merged
return network, err
}

Expand Down Expand Up @@ -281,40 +281,8 @@ func (s *Service) logPeerCount() {

func (s *Service) handleConn(conn libp2pnetwork.Conn) {
// give new peers a slight weight
// TODO: do this once handshake is received
s.syncQueue.updatePeerScore(conn.RemotePeer(), 1)

s.notificationsMu.Lock()
defer s.notificationsMu.Unlock()

info, has := s.notificationsProtocols[BlockAnnounceMsgType]
if !has {
// this shouldn't happen
logger.Warn("block announce protocol is not yet registered!")
return
}

// open block announce substream
hs, err := info.getHandshake()
if err != nil {
logger.Warn("failed to get handshake", "protocol", blockAnnounceID, "error", err)
return
}

info.mapMu.RLock()
defer info.mapMu.RUnlock()

peer := conn.RemotePeer()
if hsData, has := info.getHandshakeData(peer); !has || !hsData.received { //nolint
info.handshakeData.Store(peer, &handshakeData{
validated: false,
})

logger.Trace("sending handshake", "protocol", info.protocolID, "peer", peer, "message", hs)
err = s.host.send(peer, info.protocolID, hs)
if err != nil {
logger.Trace("failed to send block announce handshake to peer", "peer", peer, "error", err)
}
}
}

func (s *Service) beginDiscovery() error {
Expand Down Expand Up @@ -528,7 +496,7 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
if err == io.EOF {
continue
} else if err != nil {
logger.Trace("failed to read from stream", "protocol", stream.Protocol(), "error", err)
logger.Trace("failed to read from stream", "peer", stream.Conn().RemotePeer(), "protocol", stream.Protocol(), "error", err)
_ = stream.Close()
return
}
Expand All @@ -541,21 +509,18 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, peer peer.ID, decoder
}

logger.Trace(
"Received message from peer",
"received message from peer",
"host", s.host.id(),
"peer", peer,
"msg", msg.String(),
)

go func() {
// handle message based on peer status and message type
err = handler(stream, msg)
if err != nil {
logger.Trace("Failed to handle message from stream", "message", msg, "error", err)
_ = stream.Close()
return
}
}()
err = handler(stream, msg)
if err != nil {
logger.Debug("failed to handle message from stream", "message", msg, "error", err)
_ = stream.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer the Close

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? we only want to close the stream if there's an error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I assumed we would close the stream after reading once.

return
}
}
}

Expand Down
12 changes: 0 additions & 12 deletions dot/network/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,16 +438,4 @@ func TestHandleConn(t *testing.T) {
aScore, ok := nodeB.syncQueue.peerScore.Load(nodeA.host.id())
require.True(t, ok)
require.Equal(t, 1, aScore)

infoA := nodeA.notificationsProtocols[BlockAnnounceMsgType]
hsDataB, has := infoA.getHandshakeData(nodeB.host.id())
require.True(t, has)
require.True(t, hsDataB.received)
require.True(t, hsDataB.validated)

infoB := nodeB.notificationsProtocols[BlockAnnounceMsgType]
hsDataA, has := infoB.getHandshakeData(nodeA.host.id())
require.True(t, has)
require.True(t, hsDataA.received)
require.True(t, hsDataA.validated)
}
18 changes: 11 additions & 7 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package network

import (
"context"
"errors"
"fmt"
"reflect"
"sort"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/ChainSafe/gossamer/lib/common/optional"
"github.com/ChainSafe/gossamer/lib/common/variadic"

"github.com/ChainSafe/chaindb"
libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)
Expand Down Expand Up @@ -182,7 +184,7 @@ func (q *syncQueue) syncAtHead() {
for {
select {
// sleep for average block time TODO: make this configurable from slot duration
case <-time.After(q.slotDuration):
case <-time.After(q.slotDuration * 2):
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -689,14 +691,14 @@ func (q *syncQueue) handleBlockJustification(data []*types.BlockData) {
}

func (q *syncQueue) handleBlockData(data []*types.BlockData) {
bestNum, err := q.s.blockState.BestBlockNumber()
finalized, err := q.s.blockState.GetFinalizedHeader(0, 0)
if err != nil {
panic(err) // TODO: don't panic but try again. seems blockState needs better concurrency handling
}

end := data[len(data)-1].Number().Int64()
if end <= bestNum.Int64() {
logger.Debug("ignoring block data that is below our head", "got", end, "head", bestNum.Int64())
if end <= finalized.Number.Int64() {
logger.Debug("ignoring block data that is below our head", "got", end, "head", finalized.Number.Int64())
q.pushRequest(uint64(end+1), blockRequestBufferSize, "")
return
}
Expand Down Expand Up @@ -736,7 +738,7 @@ func (q *syncQueue) handleBlockData(data []*types.BlockData) {
func (q *syncQueue) handleBlockDataFailure(idx int, err error, data []*types.BlockData) {
logger.Warn("failed to handle block data", "failed on block", q.currStart+int64(idx), "error", err)

if err.Error() == "failed to get parent hash: Key not found" { // TODO: unwrap err
if errors.Is(err, chaindb.ErrKeyNotFound) {
header, err := types.NewHeaderFromOptional(data[idx].Header)
if err != nil {
logger.Debug("failed to get header from BlockData", "idx", idx, "error", err)
Expand Down Expand Up @@ -787,7 +789,6 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

logger.Debug("received BlockAnnounce!", "number", msg.Number, "hash", header.Hash(), "from", from)
has, _ := q.s.blockState.HasBlockBody(header.Hash())
if has {
return
Expand All @@ -797,13 +798,16 @@ func (q *syncQueue) handleBlockAnnounce(msg *BlockAnnounceMessage, from peer.ID)
return
}

q.goal = header.Number.Int64()

bestNum, err := q.s.blockState.BestBlockNumber()
if err != nil {
logger.Error("failed to get best block number", "error", err)
return
}

q.goal = header.Number.Int64()
// TODO: if we're at the head, this should request by hash instead of number, since there will
// certainly be blocks with the same number.
q.pushRequest(uint64(bestNum.Int64()+1), blockRequestBufferSize, from)
}

Expand Down
6 changes: 4 additions & 2 deletions dot/network/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ChainSafe/gossamer/lib/common/optional"
"github.com/ChainSafe/gossamer/lib/utils"

"github.com/ChainSafe/chaindb"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -425,9 +426,10 @@ func TestSyncQueue_SyncAtHead(t *testing.T) {
q.stop()
time.Sleep(time.Second)
q.ctx = context.Background()
q.slotDuration = time.Millisecond * 100

go q.syncAtHead()
time.Sleep(time.Millisecond * 6100)
time.Sleep(q.slotDuration * 3)
select {
case req := <-q.requestCh:
require.Equal(t, uint64(2), req.req.StartingBlock.Uint64())
Expand Down Expand Up @@ -500,7 +502,7 @@ func TestSyncQueue_handleBlockDataFailure_MissingParent(t *testing.T) {
q.ctx = context.Background()

data := testBlockResponseMessage().BlockData
q.handleBlockDataFailure(0, fmt.Errorf("failed to get parent hash: Key not found"), data)
q.handleBlockDataFailure(0, fmt.Errorf("some error: %w", chaindb.ErrKeyNotFound), data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q.handleBlockDataFailure(0, chaindb.ErrKeyNotFound, data)

Copy link
Contributor Author

@noot noot Apr 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to test the errors.Is functionality in handleBlockDataFailure, which unwraps the error for you:

if errors.Is(err, chaindb.ErrKeyNotFound) {
    // do something
}

see https://golang.org/pkg/errors/

select {
case req := <-q.requestCh:
require.True(t, req.req.StartingBlock.IsHash())
Expand Down
2 changes: 1 addition & 1 deletion dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func readStream(stream libp2pnetwork.Stream, buf []byte) (int, error) {
}

if length == 0 {
return 0, err // TODO: return bytes read from readLEB128ToUint64
return 0, nil // msg length of 0 is allowed, for example transactions handshake
}

// TODO: check if length > len(buf), if so probably log.Crit
Expand Down
Loading