diff --git a/agreement/fuzzer/networkFacade_test.go b/agreement/fuzzer/networkFacade_test.go index 804fb1e7ff..6a99ebb811 100644 --- a/agreement/fuzzer/networkFacade_test.go +++ b/agreement/fuzzer/networkFacade_test.go @@ -79,6 +79,11 @@ type facadePeer struct { } func (p *facadePeer) GetNetwork() network.GossipNode { return p.net } +func (p *facadePeer) RoutingAddr() []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(p.id)) + return buf +} // MakeNetworkFacade creates a facade with a given nodeID. func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade { diff --git a/catchup/fetcher_test.go b/catchup/fetcher_test.go index 85dcaba70b..3fa7a795fe 100644 --- a/catchup/fetcher_test.go +++ b/catchup/fetcher_test.go @@ -251,6 +251,10 @@ func (p *testUnicastPeer) GetAddress() string { func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn } +func (p *testUnicastPeer) RoutingAddr() []byte { + panic("not implemented") +} + func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) { responseChannel := make(chan *network.Response, 1) diff --git a/data/txHandler.go b/data/txHandler.go index ecea78f522..b5d66b6d35 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -627,7 +627,7 @@ func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool // Returns: // - the capacity guard returned by the elastic rate limiter // - a boolean indicating if the sender is rate limited -func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectablePeer) (*util.ErlCapacityGuard, bool) { +func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectableAddressablePeer) (*util.ErlCapacityGuard, bool) { var capguard *util.ErlCapacityGuard var isCMEnabled bool var err error @@ -715,11 +715,11 @@ func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []tran } // incomingTxGroupAppRateLimit checks if the sender is rate limited by the per-application rate limiter. -func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectablePeer) bool { +func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectableAddressablePeer) bool { // rate limit per application in a group. Limiting any app in a group drops the entire message. if handler.appLimiter != nil { congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold - if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) { + if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.RoutingAddr()) { transactionMessagesAppLimiterDrop.Inc(nil) return true } diff --git a/network/connPerfMon_test.go b/network/connPerfMon_test.go index 560be72a96..5c4fbc542a 100644 --- a/network/connPerfMon_test.go +++ b/network/connPerfMon_test.go @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) { addMsg := func(msgCount int) { for i := 0; i < msgCount; i++ { - msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer) + msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectableAddressablePeer) timer += int64(7 * time.Nanosecond) msg.Received = timer out = append(out, msg) diff --git a/network/gossipNode.go b/network/gossipNode.go index 68a5e36d88..eec47a2850 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -33,6 +33,17 @@ type DisconnectablePeer interface { GetNetwork() GossipNode } +// DisconnectableAddressablePeer is a Peer with a long-living connection to a network that can be disconnected and has an IP address +type DisconnectableAddressablePeer interface { + DisconnectablePeer + IPAddressable +} + +// IPAddressable is addressable with either IPv4 or IPv6 address +type IPAddressable interface { + RoutingAddr() []byte +} + // PeerOption allows users to specify a subset of peers to query // //msgp:ignore PeerOption @@ -118,7 +129,7 @@ var outgoingMessagesBufferSize = int( // IncomingMessage represents a message arriving from some peer in our p2p network type IncomingMessage struct { - Sender DisconnectablePeer + Sender DisconnectableAddressablePeer Tag Tag Data []byte Err error diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 32b9a49ef3..9b818d6076 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -943,23 +943,35 @@ func (n *P2PNetwork) txTopicHandleLoop() { } } +type gsPeer struct { + peerID peer.ID + net *P2PNetwork +} + +func (p *gsPeer) GetNetwork() GossipNode { + return p.net +} + +func (p *gsPeer) RoutingAddr() []byte { + return []byte(p.peerID) +} + // txTopicValidator calls txHandler to validate and process incoming transactions. func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult { - var routingAddr [8]byte n.wsPeersLock.Lock() - var wsp *wsPeer - var ok bool - if wsp, ok = n.wsPeers[peerID]; ok { - copy(routingAddr[:], wsp.RoutingAddr()) + var sender DisconnectableAddressablePeer + if wsp, ok := n.wsPeers[peerID]; ok { + sender = wsp } else { - // well, otherwise use last 8 bytes of peerID - copy(routingAddr[:], peerID[len(peerID)-8:]) + // otherwise use the peerID to handle the case where this peer is not in the wsPeers map yet + // this can happen when pubsub receives new peer notifications before the wsStreamHandler is called: + // create a fake peer that is good enough for tx handler to work with. + sender = &gsPeer{peerID: peerID, net: n} } n.wsPeersLock.Unlock() inmsg := IncomingMessage{ - // Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr}, - Sender: wsp, + Sender: sender, Tag: protocol.TxnTag, Data: msg.Data, Net: n, diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 51e4fc7bf2..6379dea801 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -42,6 +42,7 @@ import ( "github.com/algorand/go-algorand/test/partitiontest" pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -1374,3 +1375,33 @@ func TestP2PEnableGossipService_BothDisable(t *testing.T) { require.False(t, netA.hasPeers()) require.False(t, netB.hasPeers()) } + +// TestP2PTxTopicValidator_NoWsPeer checks txTopicValidator does not call tx handler with empty Sender +func TestP2PTxTopicValidator_NoWsPeer(t *testing.T) { + partitiontest.PartitionTest(t) + + log := logging.TestingLog(t) + + // prepare configs + cfg := config.GetDefaultLocal() + cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses + + net, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil) + require.NoError(t, err) + + peerID := peer.ID("12345678") // must be 8+ in size + msg := pubsub.Message{Message: &pb.Message{}, ID: string(peerID)} + validateIncomingTxMessage := func(rawmsg IncomingMessage) OutgoingMessage { + require.NotEmpty(t, rawmsg.Sender) + require.Implements(t, (*DisconnectableAddressablePeer)(nil), rawmsg.Sender) + return OutgoingMessage{Action: Accept} + } + net.handler.RegisterValidatorHandlers([]TaggedMessageValidatorHandler{ + {Tag: protocol.TxnTag, MessageHandler: ValidateHandleFunc(validateIncomingTxMessage)}, + }) + + ctx := context.Background() + require.NotContains(t, net.wsPeers, peerID) + res := net.txTopicValidator(ctx, peerID, &msg) + require.Equal(t, pubsub.ValidationAccept, res) +} diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 675e425573..29c75067eb 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -284,7 +284,7 @@ const ( type broadcastRequest struct { tags []Tag data [][]byte - except *wsPeer + except Peer done chan struct{} enqueueTime time.Time ctx context.Context @@ -381,7 +381,7 @@ func (wn *msgBroadcaster) BroadcastArray(ctx context.Context, tags []protocol.Ta request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx} if except != nil { - request.except = except.(*wsPeer) + request.except = except } broadcastQueue := wn.broadcastQueueBulk @@ -1401,7 +1401,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit { break } - if peer == request.except { + if Peer(peer) == request.except { continue } ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime) diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 1a355e5250..4df4032e42 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -4639,3 +4639,40 @@ func TestWebsocketNetworkHTTPClient(t *testing.T) { _, err = netB.GetHTTPClient("invalid") require.Error(t, err) } + +// TestPeerComparisonInBroadcast tests that the peer comparison in the broadcast function works as expected +// when casting wsPeer to Peer (interface{}) type. +func TestPeerComparisonInBroadcast(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + log := logging.TestingLog(t) + conf := config.GetDefaultLocal() + wn := &WebsocketNetwork{ + log: log, + config: conf, + ctx: context.Background(), + } + wn.setup() + + testPeer := &wsPeer{ + wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "test-addr", nil, ""), + sendBufferBulk: make(chan sendMessages, sendBufferLength), + } + exceptPeer := &wsPeer{ + wsPeerCore: makePeerCore(wn.ctx, wn, log, nil, "except-addr", nil, ""), + sendBufferBulk: make(chan sendMessages, sendBufferLength), + } + + request := broadcastRequest{ + tags: []protocol.Tag{"test-tag"}, + data: [][]byte{[]byte("test-data")}, + enqueueTime: time.Now(), + except: exceptPeer, + } + + wn.broadcaster.innerBroadcast(request, false, []*wsPeer{testPeer, exceptPeer}) + + require.Equal(t, 1, len(testPeer.sendBufferBulk)) + require.Equal(t, 0, len(exceptPeer.sendBufferBulk)) +} diff --git a/network/wsPeer.go b/network/wsPeer.go index 2b30a17486..64a60fc475 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -282,6 +282,8 @@ type wsPeer struct { // closers is a slice of functions to run when the peer is closed closers []func() + // closersMu synchronizes access to closers + closersMu deadlock.RWMutex // peerType defines the peer's underlying connection type // used for separate p2p vs ws metrics @@ -295,11 +297,6 @@ type HTTPPeer interface { GetHTTPClient() *http.Client } -// IPAddressable is addressable with either IPv4 or IPv6 address -type IPAddressable interface { - RoutingAddr() []byte -} - // UnicastPeer is another possible interface for the opaque Peer. // It is possible that we can only initiate a connection to a peer over websockets. type UnicastPeer interface { @@ -979,6 +976,8 @@ L: } } + wp.closersMu.RLock() + defer wp.closersMu.RUnlock() // now call all registered closers for _, f := range wp.closers { f() @@ -1115,6 +1114,9 @@ func (wp *wsPeer) sendMessagesOfInterest(messagesOfInterestGeneration uint32, me } func (wp *wsPeer) OnClose(f func()) { + wp.closersMu.Lock() + defer wp.closersMu.Unlock() + if wp.closers == nil { wp.closers = []func(){} } diff --git a/rpcs/blockService_test.go b/rpcs/blockService_test.go index 83cfd94ef9..1b7e3ebcdc 100644 --- a/rpcs/blockService_test.go +++ b/rpcs/blockService_test.go @@ -76,6 +76,10 @@ func (mup *mockUnicastPeer) GetNetwork() network.GossipNode { panic("not implemented") } +func (mup *mockUnicastPeer) RoutingAddr() []byte { + panic("not implemented") +} + // TestHandleCatchupReqNegative covers the error reporting in handleCatchupReq func TestHandleCatchupReqNegative(t *testing.T) { partitiontest.PartitionTest(t)