Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: handle empty wsPeer supplied to transaction handler #6195

Merged
merged 6 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type facadePeer struct {
}

func (p *facadePeer) GetNetwork() network.GossipNode { return p.net }
func (p *facadePeer) RoutingAddr() []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(p.id))
return buf
}

// MakeNetworkFacade creates a facade with a given nodeID.
func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
Expand Down
4 changes: 4 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func (p *testUnicastPeer) GetAddress() string {

func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn }

func (p *testUnicastPeer) RoutingAddr() []byte {
panic("not implemented")
}

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
Expand Down
6 changes: 3 additions & 3 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool
// Returns:
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the sender is rate limited
func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectablePeer) (*util.ErlCapacityGuard, bool) {
func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectableAddressablePeer) (*util.ErlCapacityGuard, bool) {
var capguard *util.ErlCapacityGuard
var isCMEnabled bool
var err error
Expand Down Expand Up @@ -715,11 +715,11 @@ func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []tran
}

// incomingTxGroupAppRateLimit checks if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectablePeer) bool {
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectableAddressablePeer) bool {
// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil {
congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) {
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return true
}
Expand Down
2 changes: 1 addition & 1 deletion network/connPerfMon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) {

addMsg := func(msgCount int) {
for i := 0; i < msgCount; i++ {
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer)
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectableAddressablePeer)
timer += int64(7 * time.Nanosecond)
msg.Received = timer
out = append(out, msg)
Expand Down
13 changes: 12 additions & 1 deletion network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type DisconnectablePeer interface {
GetNetwork() GossipNode
}

// DisconnectableAddressablePeer is a Peer with a long-living connection to a network that can be disconnected and has an IP address
type DisconnectableAddressablePeer interface {
DisconnectablePeer
IPAddressable
}

// IPAddressable is addressable with either IPv4 or IPv6 address
type IPAddressable interface {
RoutingAddr() []byte
}

// PeerOption allows users to specify a subset of peers to query
//
//msgp:ignore PeerOption
Expand Down Expand Up @@ -118,7 +129,7 @@ var outgoingMessagesBufferSize = int(

// IncomingMessage represents a message arriving from some peer in our p2p network
type IncomingMessage struct {
Sender DisconnectablePeer
Sender DisconnectableAddressablePeer
Tag Tag
Data []byte
Err error
Expand Down
30 changes: 21 additions & 9 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,23 +943,35 @@
}
}

type gsPeer struct {
peerID peer.ID
net *P2PNetwork
}

func (p *gsPeer) GetNetwork() GossipNode {
return p.net

Check warning on line 952 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L951-L952

Added lines #L951 - L952 were not covered by tests
}

func (p *gsPeer) RoutingAddr() []byte {
return []byte(p.peerID)

Check warning on line 956 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L955-L956

Added lines #L955 - L956 were not covered by tests
cce marked this conversation as resolved.
Show resolved Hide resolved
}

// txTopicValidator calls txHandler to validate and process incoming transactions.
func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
var routingAddr [8]byte
n.wsPeersLock.Lock()
var wsp *wsPeer
var ok bool
if wsp, ok = n.wsPeers[peerID]; ok {
copy(routingAddr[:], wsp.RoutingAddr())
var sender DisconnectableAddressablePeer
if wsp, ok := n.wsPeers[peerID]; ok {
sender = wsp
} else {
// well, otherwise use last 8 bytes of peerID
copy(routingAddr[:], peerID[len(peerID)-8:])
// otherwise use the peerID to handle the case where this peer is not in the wsPeers map yet
// this can happen when pubsub receives new peer notifications before the wsStreamHandler is called:
// create a fake peer that is good enough for tx handler to work with.
sender = &gsPeer{peerID: peerID, net: n}
}
n.wsPeersLock.Unlock()

inmsg := IncomingMessage{
// Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr},
Sender: wsp,
Sender: sender,
Tag: protocol.TxnTag,
Data: msg.Data,
Net: n,
Expand Down
31 changes: 31 additions & 0 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -1374,3 +1375,33 @@ func TestP2PEnableGossipService_BothDisable(t *testing.T) {
require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}

// TestP2PTxTopicValidator_NoWsPeer checks txTopicValidator does not call tx handler with empty Sender
func TestP2PTxTopicValidator_NoWsPeer(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

// prepare configs
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses

net, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)

peerID := peer.ID("12345678") // must be 8+ in size
msg := pubsub.Message{Message: &pb.Message{}, ID: string(peerID)}
validateIncomingTxMessage := func(rawmsg IncomingMessage) OutgoingMessage {
require.NotEmpty(t, rawmsg.Sender)
require.Implements(t, (*DisconnectableAddressablePeer)(nil), rawmsg.Sender)
return OutgoingMessage{Action: Accept}
}
net.handler.RegisterValidatorHandlers([]TaggedMessageValidatorHandler{
{Tag: protocol.TxnTag, MessageHandler: ValidateHandleFunc(validateIncomingTxMessage)},
})

ctx := context.Background()
require.NotContains(t, net.wsPeers, peerID)
res := net.txTopicValidator(ctx, peerID, &msg)
require.Equal(t, pubsub.ValidationAccept, res)
}
6 changes: 3 additions & 3 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ const (
type broadcastRequest struct {
tags []Tag
data [][]byte
except *wsPeer
except Peer
done chan struct{}
enqueueTime time.Time
ctx context.Context
Expand Down Expand Up @@ -381,7 +381,7 @@ func (wn *msgBroadcaster) BroadcastArray(ctx context.Context, tags []protocol.Ta

request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx}
if except != nil {
request.except = except.(*wsPeer)
request.except = except
}

broadcastQueue := wn.broadcastQueueBulk
Expand Down Expand Up @@ -1401,7 +1401,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit {
break
}
if peer == request.except {
if Peer(peer) == request.except {
cce marked this conversation as resolved.
Show resolved Hide resolved
continue
}
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
Expand Down
37 changes: 37 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4639,3 +4639,40 @@ func TestWebsocketNetworkHTTPClient(t *testing.T) {
_, err = netB.GetHTTPClient("invalid")
require.Error(t, err)
}

// TestPeerComparisonInBroadcast tests that the peer comparison in the broadcast function works as expected
// when casting wsPeer to Peer (interface{}) type.
func TestPeerComparisonInBroadcast(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

log := logging.TestingLog(t)
conf := config.GetDefaultLocal()
wn := &WebsocketNetwork{
log: log,
config: conf,
ctx: context.Background(),
}
wn.setup()

testPeer := &wsPeer{
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "test-addr", nil, ""),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
}
exceptPeer := &wsPeer{
wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "except-addr", nil, ""),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
}

request := broadcastRequest{
tags: []protocol.Tag{"test-tag"},
data: [][]byte{[]byte("test-data")},
enqueueTime: time.Now(),
except: exceptPeer,
}

wn.broadcaster.innerBroadcast(request, false, []*wsPeer{testPeer, exceptPeer})

require.Equal(t, 1, len(testPeer.sendBufferBulk))
require.Equal(t, 0, len(exceptPeer.sendBufferBulk))
}
12 changes: 7 additions & 5 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@

// closers is a slice of functions to run when the peer is closed
closers []func()
// closersMu synchronizes access to closers
closersMu deadlock.RWMutex
cce marked this conversation as resolved.
Show resolved Hide resolved

// peerType defines the peer's underlying connection type
// used for separate p2p vs ws metrics
Expand All @@ -295,11 +297,6 @@
GetHTTPClient() *http.Client
}

// IPAddressable is addressable with either IPv4 or IPv6 address
type IPAddressable interface {
RoutingAddr() []byte
}

// UnicastPeer is another possible interface for the opaque Peer.
// It is possible that we can only initiate a connection to a peer over websockets.
type UnicastPeer interface {
Expand Down Expand Up @@ -979,6 +976,8 @@
}

}
wp.closersMu.RLock()
defer wp.closersMu.RUnlock()
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// now call all registered closers
for _, f := range wp.closers {
f()
Expand Down Expand Up @@ -1115,6 +1114,9 @@
}

func (wp *wsPeer) OnClose(f func()) {
wp.closersMu.Lock()
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
defer wp.closersMu.Unlock()

Check warning on line 1118 in network/wsPeer.go

View check run for this annotation

Codecov / codecov/patch

network/wsPeer.go#L1117-L1118

Added lines #L1117 - L1118 were not covered by tests

if wp.closers == nil {
wp.closers = []func(){}
}
Expand Down
4 changes: 4 additions & 0 deletions rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (mup *mockUnicastPeer) GetNetwork() network.GossipNode {
panic("not implemented")
}

func (mup *mockUnicastPeer) RoutingAddr() []byte {
panic("not implemented")
}

// TestHandleCatchupReqNegative covers the error reporting in handleCatchupReq
func TestHandleCatchupReqNegative(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down
Loading