Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deepsource errors. #1789

Merged
merged 5 commits into from
Sep 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions dot/network/block_announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ type BlockAnnounceMessage struct {
}

// SubProtocol returns the block-announces sub-protocol
func (bm *BlockAnnounceMessage) SubProtocol() string {
func (*BlockAnnounceMessage) SubProtocol() string {
return blockAnnounceID
}

// Type returns BlockAnnounceMsgType
func (bm *BlockAnnounceMessage) Type() byte {
func (*BlockAnnounceMessage) Type() byte {
return BlockAnnounceMsgType
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func (bm *BlockAnnounceMessage) Hash() common.Hash {
}

// IsHandshake returns false
func (bm *BlockAnnounceMessage) IsHandshake() bool {
func (*BlockAnnounceMessage) IsHandshake() bool {
return false
}

Expand Down Expand Up @@ -137,7 +137,7 @@ type BlockAnnounceHandshake struct {
}

// SubProtocol returns the block-announces sub-protocol
func (hs *BlockAnnounceHandshake) SubProtocol() string {
func (*BlockAnnounceHandshake) SubProtocol() string {
return blockAnnounceID
}

Expand Down Expand Up @@ -170,17 +170,17 @@ func (hs *BlockAnnounceHandshake) Decode(in []byte) error {
}

// Type ...
func (hs *BlockAnnounceHandshake) Type() byte {
func (*BlockAnnounceHandshake) Type() byte {
return 0
}

// Hash ...
func (hs *BlockAnnounceHandshake) Hash() common.Hash {
func (*BlockAnnounceHandshake) Hash() common.Hash {
return common.Hash{}
}

// IsHandshake returns true
func (hs *BlockAnnounceHandshake) IsHandshake() bool {
func (*BlockAnnounceHandshake) IsHandshake() bool {
return true
}

Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *Service) validateBlockAnnounceHandshake(peer peer.ID, hs Handshake) err

// don't need to lock here, since function is always called inside the func returned by
// `createNotificationsMessageHandler` which locks the map beforehand.
data, ok := np.getHandshakeData(peer, true)
data, ok := np.getInboundHandshakeData(peer)
if ok {
data.handshake = hs
// TODO: since this is used only for rpc system_peers only,
Expand Down
6 changes: 3 additions & 3 deletions dot/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGossip(t *testing.T) {

nodeA := createTestService(t, configA)
handlerA := newTestStreamHandler(testBlockAnnounceMessageDecoder)
nodeA.host.registerStreamHandler("", handlerA.handleStream)
nodeA.host.registerStreamHandler(nodeA.host.protocolID, handlerA.handleStream)

basePathB := utils.NewTestBasePath(t, "nodeB")
configB := &Config{
Expand All @@ -54,7 +54,7 @@ func TestGossip(t *testing.T) {

nodeB := createTestService(t, configB)
handlerB := newTestStreamHandler(testBlockAnnounceMessageDecoder)
nodeB.host.registerStreamHandler("", handlerB.handleStream)
nodeB.host.registerStreamHandler(nodeB.host.protocolID, handlerB.handleStream)

addrInfoA := nodeA.host.addrInfo()
err := nodeB.host.connect(addrInfoA)
Expand All @@ -75,7 +75,7 @@ func TestGossip(t *testing.T) {

nodeC := createTestService(t, configC)
handlerC := newTestStreamHandler(testBlockAnnounceMessageDecoder)
nodeC.host.registerStreamHandler("", handlerC.handleStream)
nodeC.host.registerStreamHandler(nodeC.host.protocolID, handlerC.handleStream)

err = nodeC.host.connect(addrInfoA)
// retry connect if "failed to dial" error
Expand Down
24 changes: 8 additions & 16 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,9 @@ func (h *host) close() error {
return nil
}

// registerStreamHandler registers the stream handler, appending the given sub-protocol to the main protocol ID
func (h *host) registerStreamHandler(sub protocol.ID, handler func(libp2pnetwork.Stream)) {
h.h.SetStreamHandler(h.protocolID+sub, handler)
}

// registerStreamHandlerWithOverwrite registers the stream handler. if overwrite is true, it uses the passed protocol ID
// for the handler, otherwise it appends the given sub-protocol to the main protocol ID
func (h *host) registerStreamHandlerWithOverwrite(pid protocol.ID, overwrite bool, handler func(libp2pnetwork.Stream)) {
if overwrite {
h.h.SetStreamHandler(pid, handler)
} else {
h.h.SetStreamHandler(h.protocolID+pid, handler)
}
// registerStreamHandler registers the stream handler for the given protocol id.
func (h *host) registerStreamHandler(pid protocol.ID, handler func(libp2pnetwork.Stream)) {
h.h.SetStreamHandler(pid, handler)
}

// connect connects the host to a specific peer address
Expand All @@ -251,16 +241,18 @@ func (h *host) connect(p peer.AddrInfo) (err error) {
// bootstrap connects the host to the configured bootnodes
func (h *host) bootstrap() {
failed := 0
all := append(h.bootnodes, h.persistentPeers...)
for _, addrInfo := range all {
var allNodes []peer.AddrInfo
allNodes = append(allNodes, h.bootnodes...)
allNodes = append(allNodes, h.persistentPeers...)
for _, addrInfo := range allNodes {
Comment on lines +244 to +247
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did deepsource complain about this? just curious what the issue was

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.Debug("bootstrapping to peer", "peer", addrInfo.ID)
err := h.connect(addrInfo)
if err != nil {
logger.Debug("failed to bootstrap to peer", "error", err)
failed++
}
}
if failed == len(all) && len(all) != 0 {
if failed == len(allNodes) && len(allNodes) != 0 {
logger.Error("failed to bootstrap to any bootnode")
}
}
Expand Down
12 changes: 6 additions & 6 deletions dot/network/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestSend(t *testing.T) {
nodeB := createTestService(t, configB)
nodeB.noGossip = true
handler := newTestStreamHandler(testBlockRequestMessageDecoder)
nodeB.host.registerStreamHandler("", handler.handleStream)
nodeB.host.registerStreamHandler(nodeB.host.protocolID, handler.handleStream)

addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestExistingStream(t *testing.T) {
nodeA := createTestService(t, configA)
nodeA.noGossip = true
handlerA := newTestStreamHandler(testBlockRequestMessageDecoder)
nodeA.host.registerStreamHandler("", handlerA.handleStream)
nodeA.host.registerStreamHandler(nodeA.host.protocolID, handlerA.handleStream)

addrInfoA := nodeA.host.addrInfo()
basePathB := utils.NewTestBasePath(t, "nodeB")
Expand All @@ -237,7 +237,7 @@ func TestExistingStream(t *testing.T) {
nodeB := createTestService(t, configB)
nodeB.noGossip = true
handlerB := newTestStreamHandler(testBlockRequestMessageDecoder)
nodeB.host.registerStreamHandler("", handlerB.handleStream)
nodeB.host.registerStreamHandler(nodeB.host.protocolID, handlerB.handleStream)

addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestStreamCloseMetadataCleanup(t *testing.T) {
})

// Verify that handshake data exists.
_, ok := info.getHandshakeData(nodeB.host.id(), true)
_, ok := info.getInboundHandshakeData(nodeB.host.id())
require.True(t, ok)

time.Sleep(time.Second)
Expand All @@ -339,7 +339,7 @@ func TestStreamCloseMetadataCleanup(t *testing.T) {
time.Sleep(time.Second)

// Verify that handshake data is cleared.
_, ok = info.getHandshakeData(nodeB.host.id(), true)
_, ok = info.getInboundHandshakeData(nodeB.host.id())
require.False(t, ok)
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func TestStreamCloseEOF(t *testing.T) {
nodeB := createTestService(t, configB)
nodeB.noGossip = true
handler := newTestStreamHandler(testBlockRequestMessageDecoder)
nodeB.host.registerStreamHandler("", handler.handleStream)
nodeB.host.registerStreamHandler(nodeB.host.protocolID, handler.handleStream)
require.False(t, handler.exit)

addrInfoB := nodeB.host.addrInfo()
Expand Down
44 changes: 33 additions & 11 deletions dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,20 @@ package network

import (
"errors"
"reflect"
"sync"
"time"
"unsafe"

libp2pnetwork "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)

var errCannotValidateHandshake = errors.New("failed to validate handshake")
var (
errCannotValidateHandshake = errors.New("failed to validate handshake")
maxHandshakeSize = reflect.TypeOf(BlockAnnounceHandshake{}).Size()
)

const maxHandshakeSize = unsafe.Sizeof(BlockAnnounceHandshake{}) //nolint
const handshakeTimeout = time.Second * 10

// Handshake is the interface all handshakes for notifications protocols must implement
Expand Down Expand Up @@ -70,18 +72,27 @@ type notificationsProtocol struct {
outboundHandshakeData *sync.Map //map[peer.ID]*handshakeData
}

func (n *notificationsProtocol) getHandshakeData(pid peer.ID, inbound bool) (handshakeData, bool) {
func (n *notificationsProtocol) getInboundHandshakeData(pid peer.ID) (handshakeData, bool) {
var (
data interface{}
has bool
)

if inbound {
data, has = n.inboundHandshakeData.Load(pid)
} else {
data, has = n.outboundHandshakeData.Load(pid)
data, has = n.inboundHandshakeData.Load(pid)
if !has {
return handshakeData{}, false
}

return data.(handshakeData), true
}

func (n *notificationsProtocol) getOutboundHandshakeData(pid peer.ID) (handshakeData, bool) {
var (
data interface{}
has bool
)

data, has = n.outboundHandshakeData.Load(pid)
if !has {
return handshakeData{}, false
}
Expand Down Expand Up @@ -110,7 +121,18 @@ func createDecoder(info *notificationsProtocol, handshakeDecoder HandshakeDecode
return func(in []byte, peer peer.ID, inbound bool) (Message, error) {
// if we don't have handshake data on this peer, or we haven't received the handshake from them already,
// assume we are receiving the handshake
if hsData, has := info.getHandshakeData(peer, inbound); !has || !hsData.received {
var (
hsData handshakeData
has bool
)

if inbound {
hsData, has = info.getInboundHandshakeData(peer)
} else {
hsData, has = info.getOutboundHandshakeData(peer)
}

if !has || !hsData.received {
return handshakeDecoder(in)
}

Expand Down Expand Up @@ -150,7 +172,7 @@ func (s *Service) createNotificationsMessageHandler(info *notificationsProtocol,
// note: if this function is being called, it's being called via SetStreamHandler,
// ie it is an inbound stream and we only send the handshake over it.
// we do not send any other data over this stream, we would need to open a new outbound stream.
if _, has := info.getHandshakeData(peer, true); !has {
if _, has := info.getInboundHandshakeData(peer); !has {
logger.Trace("receiver: validating handshake", "protocol", info.protocolID)

hsData := newHandshakeData(true, false, stream)
Expand Down Expand Up @@ -211,7 +233,7 @@ func (s *Service) sendData(peer peer.ID, hs Handshake, info *notificationsProtoc
return
}

hsData, has := info.getHandshakeData(peer, false)
hsData, has := info.getOutboundHandshakeData(peer)
if has && !hsData.validated {
// peer has sent us an invalid handshake in the past, ignore
return
Expand Down
16 changes: 11 additions & 5 deletions dot/network/notifications_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package network
import (
"fmt"
"math/big"
"reflect"
"sync"
"testing"
"time"
"unsafe"

"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/lib/common"
Expand Down Expand Up @@ -89,7 +91,7 @@ func TestCreateDecoder_BlockAnnounce(t *testing.T) {
require.NoError(t, err)

// set handshake data to received
hsData, _ := info.getHandshakeData(testPeerID, true)
hsData, _ := info.getInboundHandshakeData(testPeerID)
hsData.received = true
info.inboundHandshakeData.Store(testPeerID, hsData)
msg, err = decoder(enc, testPeerID, true)
Expand Down Expand Up @@ -210,7 +212,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)

err = handler(stream, testHandshake)
require.Equal(t, errCannotValidateHandshake, err)
data, has := info.getHandshakeData(testPeerID, true)
data, has := info.getInboundHandshakeData(testPeerID)
require.True(t, has)
require.True(t, data.received)
require.False(t, data.validated)
Expand All @@ -227,7 +229,7 @@ func TestCreateNotificationsMessageHandler_BlockAnnounceHandshake(t *testing.T)

err = handler(stream, testHandshake)
require.NoError(t, err)
data, has = info.getHandshakeData(testPeerID, true)
data, has = info.getInboundHandshakeData(testPeerID)
require.True(t, has)
require.True(t, data.received)
require.True(t, data.validated)
Expand Down Expand Up @@ -293,7 +295,7 @@ func Test_HandshakeTimeout(t *testing.T) {
time.Sleep(time.Second)

// Verify that handshake data exists.
_, ok := info.getHandshakeData(nodeB.host.id(), false)
_, ok := info.getOutboundHandshakeData(nodeB.host.id())
require.True(t, ok)

// a stream should be open until timeout
Expand All @@ -305,11 +307,15 @@ func Test_HandshakeTimeout(t *testing.T) {
time.Sleep(handshakeTimeout)

// handshake data should be removed
_, ok = info.getHandshakeData(nodeB.host.id(), false)
_, ok = info.getOutboundHandshakeData(nodeB.host.id())
require.False(t, ok)

// stream should be closed
connAToB = nodeA.host.h.Network().ConnsToPeer(nodeB.host.id())
require.Len(t, connAToB, 1)
require.Len(t, connAToB[0].GetStreams(), 0)
}

func TestBlockAnnounceHandshakeSize(t *testing.T) {
require.Equal(t, unsafe.Sizeof(BlockAnnounceHandshake{}), reflect.TypeOf(BlockAnnounceHandshake{}).Size())
}
Loading