From cc567b6adaa66697c112fb14e8951743a0eb344d Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Thu, 21 Nov 2024 10:47:26 -0800 Subject: [PATCH] Cleanup peers --- src/overlay/OverlayManager.h | 2 +- src/overlay/OverlayManagerImpl.cpp | 2 ++ src/overlay/OverlayMetrics.cpp | 4 --- src/overlay/OverlayMetrics.h | 2 -- src/overlay/Peer.cpp | 48 ++++++++++-------------------- src/overlay/Peer.h | 4 +-- src/overlay/test/LoopbackPeer.h | 1 + src/overlay/test/OverlayTests.cpp | 45 +++++++++++++++++++++++++++- src/overlay/test/TCPPeerTests.cpp | 10 +++---- src/test/TestUtils.cpp | 17 ----------- src/test/TestUtils.h | 3 -- 11 files changed, 69 insertions(+), 69 deletions(-) diff --git a/src/overlay/OverlayManager.h b/src/overlay/OverlayManager.h index 0da2578d2d..8b5b6bf912 100644 --- a/src/overlay/OverlayManager.h +++ b/src/overlay/OverlayManager.h @@ -23,7 +23,7 @@ * The `StellarMessage` union contains 3 logically distinct kinds of message: * * - Messages directed to or from a specific peer, with or without a response: - * HELLO, GET_PEERS, PEERS, DONT_HAVE, ERROR_MSG + * HELLO, PEERS, DONT_HAVE, ERROR_MSG * * - One-way broadcast messages informing other peers of an event: * TRANSACTION and SCP_MESSAGE diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp index 8e3a3e226e..7cfbdd5d90 100644 --- a/src/overlay/OverlayManagerImpl.cpp +++ b/src/overlay/OverlayManagerImpl.cpp @@ -1161,8 +1161,10 @@ bool OverlayManagerImpl::checkScheduledAndCache( std::shared_ptr tracker) { +#ifndef BUILD_TESTS releaseAssert(!threadIsMain() || !mApp.getConfig().BACKGROUND_OVERLAY_PROCESSING); +#endif if (!tracker->maybeGetHash()) { return false; diff --git a/src/overlay/OverlayMetrics.cpp b/src/overlay/OverlayMetrics.cpp index 6125acae76..ea2c8db944 100644 --- a/src/overlay/OverlayMetrics.cpp +++ b/src/overlay/OverlayMetrics.cpp @@ -43,8 +43,6 @@ OverlayMetrics::OverlayMetrics(Application& app) , mRecvAuthTimer(app.getMetrics().NewTimer({"overlay", "recv", "auth"})) , mRecvDontHaveTimer( app.getMetrics().NewTimer({"overlay", "recv", "dont-have"})) - , mRecvGetPeersTimer( - app.getMetrics().NewTimer({"overlay", "recv", "get-peers"})) , mRecvPeersTimer(app.getMetrics().NewTimer({"overlay", "recv", "peers"})) , mRecvGetTxSetTimer( app.getMetrics().NewTimer({"overlay", "recv", "get-txset"})) @@ -114,8 +112,6 @@ OverlayMetrics::OverlayMetrics(Application& app) app.getMetrics().NewMeter({"overlay", "send", "auth"}, "message")) , mSendDontHaveMeter(app.getMetrics().NewMeter( {"overlay", "send", "dont-have"}, "message")) - , mSendGetPeersMeter(app.getMetrics().NewMeter( - {"overlay", "send", "get-peers"}, "message")) , mSendPeersMeter( app.getMetrics().NewMeter({"overlay", "send", "peers"}, "message")) , mSendGetTxSetMeter(app.getMetrics().NewMeter( diff --git a/src/overlay/OverlayMetrics.h b/src/overlay/OverlayMetrics.h index 64cb9dadc8..691a3d0b54 100644 --- a/src/overlay/OverlayMetrics.h +++ b/src/overlay/OverlayMetrics.h @@ -44,7 +44,6 @@ struct OverlayMetrics medida::Timer& mRecvHelloTimer; medida::Timer& mRecvAuthTimer; medida::Timer& mRecvDontHaveTimer; - medida::Timer& mRecvGetPeersTimer; medida::Timer& mRecvPeersTimer; medida::Timer& mRecvGetTxSetTimer; medida::Timer& mRecvTxSetTimer; @@ -84,7 +83,6 @@ struct OverlayMetrics medida::Meter& mSendHelloMeter; medida::Meter& mSendAuthMeter; medida::Meter& mSendDontHaveMeter; - medida::Meter& mSendGetPeersMeter; medida::Meter& mSendPeersMeter; medida::Meter& mSendGetTxSetMeter; medida::Meter& mSendTransactionMeter; diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index 2ffb4e830b..c331c8d170 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -524,17 +524,6 @@ Peer::sendGetQuorumSet(uint256 const& setID) sendMessage(msgPtr); } -void -Peer::sendGetPeers() -{ - ZoneScoped; - releaseAssert(threadIsMain()); - StellarMessage newMsg; - newMsg.type(GET_PEERS); - auto msgPtr = std::make_shared(newMsg); - sendMessage(msgPtr); -} - void Peer::sendGetScpState(uint32 ledgerSeq) { @@ -624,8 +613,6 @@ Peer::msgSummary(StellarMessage const& msg) case DONT_HAVE: return fmt::format(FMT_STRING("DONTHAVE {}:{}"), msg.dontHave().type, hexAbbrev(msg.dontHave().reqHash)); - case GET_PEERS: - return "GETPEERS"; case PEERS: return fmt::format(FMT_STRING("PEERS {:d}"), msg.peers().size()); @@ -713,9 +700,6 @@ Peer::sendMessage(std::shared_ptr msg, bool log) case DONT_HAVE: mOverlayMetrics.mSendDontHaveMeter.Mark(); break; - case GET_PEERS: - mOverlayMetrics.mSendGetPeersMeter.Mark(); - break; case PEERS: mOverlayMetrics.mSendPeersMeter.Mark(); break; @@ -927,7 +911,6 @@ Peer::recvAuthenticatedMessage(AuthenticatedMessage&& msg) cat = AUTH_ACTION_QUEUE; break; // control messages - case GET_PEERS: case PEERS: case ERROR_MSG: case SEND_MORE: @@ -1093,6 +1076,13 @@ Peer::recvRawMessage(std::shared_ptr msgTracker) return; } + if (stellarMsg.type() == PEERS && getRole() == REMOTE_CALLED_US) + { + drop(fmt::format("received {}", stellarMsg.type()), + Peer::DropDirection::WE_DROPPED_REMOTE); + return; + } + releaseAssert(isAuthenticated(guard) || stellarMsg.type() == HELLO || stellarMsg.type() == AUTH || stellarMsg.type() == ERROR_MSG); @@ -1130,13 +1120,6 @@ Peer::recvRawMessage(std::shared_ptr msgTracker) } break; - case GET_PEERS: - { - auto t = mOverlayMetrics.mRecvGetPeersTimer.TimeScope(); - recvGetPeers(stellarMsg); - } - break; - case PEERS: { auto t = mOverlayMetrics.mRecvPeersTimer.TimeScope(); @@ -1804,21 +1787,20 @@ Peer::recvAuth(StellarMessage const& msg) sendGetScpState(low); } -void -Peer::recvGetPeers(StellarMessage const& msg) -{ - ZoneScoped; - releaseAssert(threadIsMain()); - - sendPeers(); -} - void Peer::recvPeers(StellarMessage const& msg) { ZoneScoped; releaseAssert(threadIsMain()); + if (mPeersReceived) + { + drop(fmt::format("too many msgs {}", msg.type()), + Peer::DropDirection::WE_DROPPED_REMOTE); + return; + } + mPeersReceived = true; + for (auto const& peer : msg.peers()) { if (peer.port == 0 || peer.port > UINT16_MAX) diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h index 327e6f37ad..53426d53ac 100644 --- a/src/overlay/Peer.h +++ b/src/overlay/Peer.h @@ -72,7 +72,6 @@ class Peer : public std::enable_shared_from_this, std::chrono::milliseconds(1); static constexpr std::chrono::nanoseconds PEER_METRICS_RATE_UNIT = std::chrono::seconds(1); - static constexpr uint32_t FIRST_VERSION_REQUIRED_FOR_PROTOCOL_20 = 32; // The reporting will be based on the previous // PEER_METRICS_WINDOW_SIZE-second time window. @@ -262,6 +261,7 @@ class Peer : public std::enable_shared_from_this, std::shared_ptr mTxAdverts; QueryInfo mQSetQueryInfo; QueryInfo mTxSetQueryInfo; + bool mPeersReceived{false}; static Hash pingIDfromTimePoint(VirtualClock::time_point const& tp); void pingPeer(); @@ -276,7 +276,6 @@ class Peer : public std::enable_shared_from_this, void updatePeerRecordAfterAuthentication(); void recvAuth(StellarMessage const& msg); void recvDontHave(StellarMessage const& msg); - void recvGetPeers(StellarMessage const& msg); void recvHello(Hello const& elo); void recvPeers(StellarMessage const& msg); void recvSurveyRequestMessage(StellarMessage const& msg); @@ -343,7 +342,6 @@ class Peer : public std::enable_shared_from_this, std::string msgSummary(StellarMessage const& stellarMsg); void sendGetTxSet(uint256 const& setID); void sendGetQuorumSet(uint256 const& setID); - void sendGetPeers(); void sendGetScpState(uint32 ledgerSeq); void sendErrorAndDrop(ErrorCode error, std::string const& message); void sendTxDemand(TxDemandVector&& demands); diff --git a/src/overlay/test/LoopbackPeer.h b/src/overlay/test/LoopbackPeer.h index 4282661a4b..0f2c40551b 100644 --- a/src/overlay/test/LoopbackPeer.h +++ b/src/overlay/test/LoopbackPeer.h @@ -156,6 +156,7 @@ class LoopbackPeer : public Peer using Peer::sendAuth; using Peer::sendAuthenticatedMessage; using Peer::sendMessage; + using Peer::sendPeers; friend class LoopbackPeerConnection; }; diff --git a/src/overlay/test/OverlayTests.cpp b/src/overlay/test/OverlayTests.cpp index f394f39cb8..9c04a429a1 100644 --- a/src/overlay/test/OverlayTests.cpp +++ b/src/overlay/test/OverlayTests.cpp @@ -770,6 +770,49 @@ TEST_CASE("failed auth", "[overlay][connections]") testutil::shutdownWorkScheduler(*app1); } +TEST_CASE("peers during auth", "[overlay][connections]") +{ + VirtualClock clock; + Config const& cfg1 = getTestConfig(0); + Config const& cfg2 = getTestConfig(1); + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + // Put a peer into Acceptor's DB to trigger sending of peers during auth + app2->getOverlayManager().getPeerManager().ensureExists( + PeerBareAddress{"1.1.1.1", 11625}); + + LoopbackPeerConnection conn(*app1, *app2); + testutil::crankSome(clock); + + REQUIRE(conn.getInitiator()->isAuthenticatedForTesting()); + REQUIRE(conn.getAcceptor()->isAuthenticatedForTesting()); + + StellarMessage newMsg; + newMsg.type(PEERS); + std::string dropReason; + SECTION("inbound") + { + dropReason = "received PEERS"; + conn.getInitiator()->sendMessage( + std::make_shared(newMsg)); + } + SECTION("outbound") + { + dropReason = "too many msgs PEERS"; + conn.getAcceptor()->sendMessage( + std::make_shared(newMsg)); + } + + testutil::crankFor(clock, std::chrono::seconds(1)); + + REQUIRE(!conn.getInitiator()->isConnectedForTesting()); + REQUIRE(!conn.getAcceptor()->isConnectedForTesting()); + REQUIRE(conn.getAcceptor()->getDropReason() == dropReason); + + testutil::shutdownWorkScheduler(*app2); + testutil::shutdownWorkScheduler(*app1); +} + TEST_CASE("outbound queue filtering", "[overlay][connections]") { auto networkID = sha256(getTestConfig().NETWORK_PASSPHRASE); @@ -1773,7 +1816,7 @@ TEST_CASE("drop peers who straggle", "[overlay][connections][straggler]") sendTimer.async_wait([straggler](asio::error_code const& error) { if (!error) { - straggler->sendGetPeers(); + straggler->sendGetTxSet(Hash()); } }); testutil::crankFor(clock, dur); diff --git a/src/overlay/test/TCPPeerTests.cpp b/src/overlay/test/TCPPeerTests.cpp index c8b35c62d7..99eae5a4ac 100644 --- a/src/overlay/test/TCPPeerTests.cpp +++ b/src/overlay/test/TCPPeerTests.cpp @@ -139,20 +139,20 @@ TEST_CASE("TCPPeer can communicate", "[overlay]") s->stopOverlayTick(); // Now drop peer, ensure ERROR containing "drop reason" is properly flushed - auto& recvGetPeers = - n1->getOverlayManager().getOverlayMetrics().mRecvGetPeersTimer; + auto& recvGetTxSet = + n1->getOverlayManager().getOverlayMetrics().mRecvGetTxSetTimer; auto& recvError = n1->getOverlayManager().getOverlayMetrics().mRecvErrorTimer; - auto prevPeers = recvGetPeers.count(); + auto prevTxSet = recvGetTxSet.count(); auto prevError = recvError.count(); - p0->sendGetPeers(); + p0->sendGetTxSet(Hash()); p0->sendErrorAndDrop(ERR_MISC, "test drop"); s->crankForAtLeast(std::chrono::seconds(1), false); REQUIRE(!p0->isConnectedForTesting()); REQUIRE(!p1->isConnectedForTesting()); // p1 must have received getPeers, Error - REQUIRE(recvGetPeers.count() == prevPeers + 1); + REQUIRE(recvGetTxSet.count() == prevTxSet + 1); REQUIRE(recvError.count() == prevError + 1); s->stopAllNodes(); } diff --git a/src/test/TestUtils.cpp b/src/test/TestUtils.cpp index cd5f655f77..b351112bd8 100644 --- a/src/test/TestUtils.cpp +++ b/src/test/TestUtils.cpp @@ -50,23 +50,6 @@ shutdownWorkScheduler(Application& app) } } -void -injectSendPeersAndReschedule(VirtualClock::time_point& end, VirtualClock& clock, - VirtualTimer& timer, - LoopbackPeerConnection& connection) -{ - connection.getInitiator()->sendGetPeers(); - if (clock.now() < end && connection.getInitiator()->isConnectedForTesting()) - { - timer.expires_from_now(std::chrono::milliseconds(10)); - timer.async_wait( - [&]() { - injectSendPeersAndReschedule(end, clock, timer, connection); - }, - &VirtualTimer::onFailureNoop); - } -} - std::vector getInvalidAssets(SecretKey const& issuer) { diff --git a/src/test/TestUtils.h b/src/test/TestUtils.h index 83f3e6d4f9..f134956125 100644 --- a/src/test/TestUtils.h +++ b/src/test/TestUtils.h @@ -22,9 +22,6 @@ namespace testutil { void crankSome(VirtualClock& clock); void crankFor(VirtualClock& clock, VirtualClock::duration duration); -void injectSendPeersAndReschedule(VirtualClock::time_point& end, - VirtualClock& clock, VirtualTimer& timer, - LoopbackPeerConnection& connection); void shutdownWorkScheduler(Application& app);