Skip to content

Commit

Permalink
P2P: Ensure intra-quorum connections and only relay to members
Browse files Browse the repository at this point in the history
  • Loading branch information
random-zebra committed Feb 15, 2022
1 parent 87e962a commit 839f14d
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 51 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ set(COMMON_SOURCES
./src/evo/specialtx_validation.cpp
./src/llmq/quorums_blockprocessor.cpp
./src/llmq/quorums_commitment.cpp
./src/llmq/quorums_connections.cpp
./src/llmq/quorums_debug.cpp
./src/llmq/quorums_dkgsessionhandler.cpp
./src/llmq/quorums_dkgsessionmgr.cpp
Expand Down
2 changes: 2 additions & 0 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ BITCOIN_CORE_H = \
flatdb.h \
llmq/quorums_blockprocessor.h \
llmq/quorums_commitment.h \
llmq/quorums_connections.h \
llmq/quorums_debug.h \
llmq/quorums_dkgsessionhandler.h \
llmq/quorums_dkgsessionmgr.h \
Expand Down Expand Up @@ -367,6 +368,7 @@ libbitcoin_server_a_SOURCES = \
evo/specialtx_validation.cpp \
llmq/quorums_blockprocessor.cpp \
llmq/quorums_commitment.cpp \
llmq/quorums_connections.cpp \
llmq/quorums_debug.cpp \
llmq/quorums_dkgsessionhandler.cpp \
llmq/quorums_dkgsessionmgr.cpp \
Expand Down
11 changes: 0 additions & 11 deletions src/evo/deterministicmns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "core_io.h"
#include "key_io.h"
#include "guiinterface.h"
#include "llmq/quorums_utils.h"
#include "masternodeman.h" // for mnodeman (!TODO: remove)
#include "script/standard.h"
#include "spork.h"
Expand Down Expand Up @@ -981,14 +980,4 @@ std::vector<CDeterministicMNCPtr> CDeterministicMNManager::GetAllQuorumMembers(C
return allMns.CalculateQuorum(params.size, modifier);
}

std::set<uint256> CDeterministicMNManager::GetQuorumRelayMembers(Consensus::LLMQType llmqType, const CBlockIndex *pindexQuorum)
{
// !TODO: complete me
auto mns = GetAllQuorumMembers(llmqType, pindexQuorum);
std::set<uint256> result;
for (const auto& dmn : mns) {
result.emplace(dmn->proTxHash);
}
return result;
}

1 change: 0 additions & 1 deletion src/evo/deterministicmns.h
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,6 @@ class CDeterministicMNManager

// Get the list of members for a given quorum type and index
std::vector<CDeterministicMNCPtr> GetAllQuorumMembers(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum);
std::set<uint256> GetQuorumRelayMembers(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum);

private:
void CleanupCache(int nHeight);
Expand Down
4 changes: 2 additions & 2 deletions src/evo/mnauth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "consensus/validation.h"
#include "net.h" // for CSerializedNetMsg
#include "netmessagemaker.h"
#include "llmq/quorums_utils.h"
#include "llmq/quorums_connections.h"
#include "tiertwo/masternode_meta_manager.h"
#include "tiertwo/net_masternodes.h"
#include "tiertwo/tiertwo_sync_state.h"
Expand Down Expand Up @@ -142,7 +142,7 @@ bool CMNAuth::ProcessMessage(CNode* pnode, const std::string& strCommand, CDataS

if (pnode2->verifiedProRegTxHash == mnauth.proRegTxHash) {
if (fMasterNode) {
auto deterministicOutbound = llmq::utils::DeterministicOutboundConnection(activeMnInfo->proTxHash, mnauth.proRegTxHash);
auto deterministicOutbound = llmq::DeterministicOutboundConnection(activeMnInfo->proTxHash, mnauth.proRegTxHash);
LogPrint(BCLog::NET_MN, "CMNAuth::ProcessMessage -- Masternode %s has already verified as peer %d, deterministicOutbound=%s. peer=%d\n",
mnauth.proRegTxHash.ToString(), pnode2->GetId(), deterministicOutbound.ToString(), pnode->GetId());
if (deterministicOutbound == activeMnInfo->proTxHash) {
Expand Down
204 changes: 204 additions & 0 deletions src/llmq/quorums_connections.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright (c) 2018-2019 The Dash Core developers
// Copyright (c) 2022 The PIVX developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include "llmq/quorums_connections.h"

#include "evo/deterministicmns.h"
#include "net.h"
#include "tiertwo/masternode_meta_manager.h" // for g_mmetaman
#include "tiertwo/net_masternodes.h"
#include "validation.h"

namespace llmq
{


uint256 DeterministicOutboundConnection(const uint256& proTxHash1, const uint256& proTxHash2)
{
// We need to deterministically select who is going to initiate the connection. The naive way would be to simply
// return the min(proTxHash1, proTxHash2), but this would create a bias towards MNs with a numerically low
// hash. To fix this, we return the proTxHash that has the lowest value of:
// hash(min(proTxHash1, proTxHash2), max(proTxHash1, proTxHash2), proTxHashX)
// where proTxHashX is the proTxHash to compare
uint256 h1;
uint256 h2;
if (proTxHash1 < proTxHash2) {
h1 = ::SerializeHash(std::make_tuple(proTxHash1, proTxHash2, proTxHash1));
h2 = ::SerializeHash(std::make_tuple(proTxHash1, proTxHash2, proTxHash2));
} else {
h1 = ::SerializeHash(std::make_tuple(proTxHash2, proTxHash1, proTxHash1));
h2 = ::SerializeHash(std::make_tuple(proTxHash2, proTxHash1, proTxHash2));
}
if (h1 < h2) {
return proTxHash1;
}
return proTxHash2;
}

std::set<uint256> GetQuorumRelayMembers(Consensus::LLMQType llmqType, const CBlockIndex *pindexQuorum, const uint256 &forMember, bool onlyOutbound)
{
auto mns = deterministicMNManager->GetAllQuorumMembers(llmqType, pindexQuorum);
std::set<uint256> result;

auto calcOutbound = [&](size_t i, const uint256 proTxHash) {
// Relay to nodes at indexes (i+2^k)%n, where
// k: 0..max(1, floor(log2(n-1))-1)
// n: size of the quorum/ring
std::set<uint256> r;
int gap = 1;
int gap_max = (int)mns.size() - 1;
int k = 0;
while ((gap_max >>= 1) || k <= 1) {
size_t idx = (i + gap) % mns.size();
auto& otherDmn = mns[idx];
if (otherDmn->proTxHash == proTxHash) {
continue;
}
r.emplace(otherDmn->proTxHash);
gap <<= 1;
k++;
}
return r;
};

for (size_t i = 0; i < mns.size(); i++) {
auto& dmn = mns[i];
if (dmn->proTxHash == forMember) {
auto r = calcOutbound(i, dmn->proTxHash);
result.insert(r.begin(), r.end());
} else if (!onlyOutbound) {
auto r = calcOutbound(i, dmn->proTxHash);
if (r.count(forMember)) {
result.emplace(dmn->proTxHash);
}
}
}

return result;
}

static std::set<uint256> GetQuorumConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum, const uint256& forMember, bool onlyOutbound)
{
auto mns = deterministicMNManager->GetAllQuorumMembers(llmqType, pindexQuorum);
std::set<uint256> result;

for (auto& dmn : mns) {
if (dmn->proTxHash == forMember) {
continue;
}
// Determine which of the two MNs (forMember vs dmn) should initiate the outbound connection and which
// one should wait for the inbound connection. We do this in a deterministic way, so that even when we
// end up with both connecting to each other, we know which one to disconnect
uint256 deterministicOutbound = DeterministicOutboundConnection(forMember, dmn->proTxHash);
if (!onlyOutbound || deterministicOutbound == dmn->proTxHash) {
result.emplace(dmn->proTxHash);
}
}
return result;
}

std::set<size_t> CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum, size_t memberCount, size_t connectionCount)
{
static uint256 qwatchConnectionSeed;
static std::atomic<bool> qwatchConnectionSeedGenerated{false};
static RecursiveMutex qwatchConnectionSeedCs;
if (!qwatchConnectionSeedGenerated) {
LOCK(qwatchConnectionSeedCs);
if (!qwatchConnectionSeedGenerated) {
qwatchConnectionSeed = GetRandHash();
qwatchConnectionSeedGenerated = true;
}
}

std::set<size_t> result;
uint256 rnd = qwatchConnectionSeed;
for (size_t i = 0; i < connectionCount; i++) {
rnd = ::SerializeHash(std::make_pair(rnd, std::make_pair(static_cast<uint8_t>(llmqType), pindexQuorum->GetBlockHash())));
result.emplace(rnd.GetUint64(0) % memberCount);
}
return result;
}

void EnsureQuorumConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum, const uint256& myProTxHash)
{
auto members = deterministicMNManager->GetAllQuorumMembers(llmqType, pindexQuorum);
bool isMember = std::find_if(members.begin(), members.end(), [&](const CDeterministicMNCPtr& dmn) { return dmn->proTxHash == myProTxHash; }) != members.end();

if (!isMember) { // && !CLLMQUtils::IsWatchQuorumsEnabled()) {
return;
}

std::set<uint256> connections;
std::set<uint256> relayMembers;
if (isMember) {
connections = GetQuorumConnections(llmqType, pindexQuorum, myProTxHash, true);
relayMembers = GetQuorumRelayMembers(llmqType, pindexQuorum, myProTxHash, true);
} else {
auto cindexes = CalcDeterministicWatchConnections(llmqType, pindexQuorum, members.size(), 1);
for (auto idx : cindexes) {
connections.emplace(members[idx]->proTxHash);
}
relayMembers = connections;
}
if (!connections.empty()) {
auto connman = g_connman->GetTierTwoConnMan();
if (!connman->hasQuorumNodes(llmqType, pindexQuorum->GetBlockHash()) && LogAcceptCategory(BCLog::LLMQ)) {
auto mnList = deterministicMNManager->GetListAtChainTip();
std::string debugMsg = strprintf("CLLMQUtils::%s -- adding masternodes quorum connections for quorum %s:\n", __func__, pindexQuorum->GetBlockHash().ToString());
for (auto& c : connections) {
auto dmn = mnList.GetValidMN(c);
if (!dmn) {
debugMsg += strprintf(" %s (not in valid MN set anymore)\n", c.ToString());
} else {
debugMsg += strprintf(" %s (%s)\n", c.ToString(), dmn->pdmnState->addr.ToString());
}
}
LogPrint(BCLog::LLMQ, debugMsg.c_str()); /* Continued */
}
connman->setQuorumNodes(llmqType, pindexQuorum->GetBlockHash(), connections);
}
if (!relayMembers.empty()) {
auto connman = g_connman->GetTierTwoConnMan();
connman->setMasternodeQuorumRelayMembers(llmqType, pindexQuorum->GetBlockHash(), relayMembers);
}
}

void AddQuorumProbeConnections(Consensus::LLMQType llmqType, const CBlockIndex *pindexQuorum, const uint256 &myProTxHash)
{
auto members = deterministicMNManager->GetAllQuorumMembers(llmqType, pindexQuorum);
auto curTime = GetAdjustedTime();

std::set<uint256> probeConnections;
for (auto& dmn : members) {
if (dmn->proTxHash == myProTxHash) {
continue;
}
auto lastOutbound = g_mmetaman.GetMetaInfo(dmn->proTxHash)->GetLastOutboundSuccess();
// re-probe after 50 minutes so that the "good connection" check in the DKG doesn't fail just because we're on
// the brink of timeout
if (curTime - lastOutbound > 50 * 60) {
probeConnections.emplace(dmn->proTxHash);
}
}

if (!probeConnections.empty()) {
if (LogAcceptCategory(BCLog::LLMQ)) {
auto mnList = deterministicMNManager->GetListAtChainTip();
std::string debugMsg = strprintf("CLLMQUtils::%s -- adding masternodes probes for quorum %s:\n", __func__, pindexQuorum->GetBlockHash().ToString());
for (auto& c : probeConnections) {
auto dmn = mnList.GetValidMN(c);
if (!dmn) {
debugMsg += strprintf(" %s (not in valid MN set anymore)\n", c.ToString());
} else {
debugMsg += strprintf(" %s (%s)\n", c.ToString(), dmn->pdmnState->addr.ToString());
}
}
LogPrint(BCLog::LLMQ, debugMsg.c_str()); /* Continued */
}
g_connman->GetTierTwoConnMan()->addPendingProbeConnections(probeConnections);
}
}

} // namespace llmq
26 changes: 26 additions & 0 deletions src/llmq/quorums_connections.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2018-2019 The Dash Core developers
// Copyright (c) 2022 The PIVX developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#ifndef PIVX_QUORUMS_CONNECTIONS_H
#define PIVX_QUORUMS_CONNECTIONS_H

#include "consensus/params.h"

class CBlockIndex;

namespace llmq {

// Deterministically selects which node should initiate the mnauth process
uint256 DeterministicOutboundConnection(const uint256& proTxHash1, const uint256& proTxHash2);

std::set<uint256> GetQuorumRelayMembers(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum, const uint256& forMember, bool onlyOutbound);
std::set<size_t> CalcDeterministicWatchConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum, size_t memberCount, size_t connectionCount);

void EnsureQuorumConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum, const uint256& myProTxHash);
void AddQuorumProbeConnections(Consensus::LLMQType llmqType, const CBlockIndex* pindexQuorum, const uint256& myProTxHash);

} // namespace llmq

#endif // PIVX_QUORUMS_CONNECTIONS_H
41 changes: 39 additions & 2 deletions src/llmq/quorums_dkgsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
#include "cxxtimer.hpp"
#include "evo/specialtx_validation.h"
#include "init.h"
#include "llmq/quorums_connections.h"
#include "llmq/quorums_commitment.h"
#include "llmq/quorums_debug.h"
#include "llmq/quorums_dkgsessionmgr.h"
#include "llmq/quorums_utils.h"
#include "net.h"
#include "netmessagemaker.h"
#include "spork.h"
#include "tiertwo/masternode_meta_manager.h"
#include "univalue.h"
#include "validation.h"

Expand Down Expand Up @@ -126,7 +127,7 @@ bool CDKGSession::Init(const CBlockIndex* _pindexQuorum, const std::vector<CDete
LogPrint(BCLog::DKG, "CDKGSession::%s: initialized as observer. mns=%d\n", __func__, mns.size());
} else {
quorumDKGDebugManager->InitLocalSessionStatus(params.type, pindexQuorum->GetBlockHash(), pindexQuorum->nHeight);
relayMembers = deterministicMNManager->GetQuorumRelayMembers(params.type, pindexQuorum);
relayMembers = GetQuorumRelayMembers(params.type, pindexQuorum, myProTxHash, true);
LogPrint(BCLog::DKG, "CDKGSession::%s: initialized as member. mns=%d\n", __func__, mns.size());
}

Expand Down Expand Up @@ -435,9 +436,45 @@ void CDKGSession::VerifyAndComplain(CDKGPendingMessages& pendingMessages)
logger.Batch("verified contributions. time=%d", t1.count());
logger.Flush();

VerifyConnectionAndMinProtoVersions();

SendComplaint(pendingMessages);
}

void CDKGSession::VerifyConnectionAndMinProtoVersions()
{
CDKGLogger logger(*this, __func__);

std::unordered_map<uint256, int, StaticSaltedHasher> protoMap;
g_connman->ForEachNode([&](const CNode* pnode) {
if (pnode->verifiedProRegTxHash.IsNull()) {
return;
}
protoMap.emplace(pnode->verifiedProRegTxHash, pnode->nVersion);
});

for (auto& m : members) {
if (m->dmn->proTxHash == myProTxHash) {
continue;
}

auto it = protoMap.find(m->dmn->proTxHash);
if (it == protoMap.end()) {
m->bad = true;
logger.Batch("%s is not connected to us", m->dmn->proTxHash.ToString());
} else if (it != protoMap.end() && it->second < MNAUTH_NODE_VER_VERSION) {
m->bad = true;
logger.Batch("%s does not have min proto version %d (has %d)", m->dmn->proTxHash.ToString(), MNAUTH_NODE_VER_VERSION, it->second);
}

auto lastOutbound = g_mmetaman.GetMetaInfo(m->dmn->proTxHash)->GetLastOutboundSuccess();
if (GetAdjustedTime() - lastOutbound > 60 * 60) {
m->bad = true;
logger.Batch("%s no outbound connection since %d seconds", m->dmn->proTxHash.ToString(), GetAdjustedTime() - lastOutbound);
}
}
}

void CDKGSession::SendComplaint(CDKGPendingMessages& pendingMessages)
{
CDKGLogger logger(*this, __func__);
Expand Down
1 change: 1 addition & 0 deletions src/llmq/quorums_dkgsession.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class CDKGSession

// Phase 2: complaint
void VerifyAndComplain(CDKGPendingMessages& pendingMessages);
void VerifyConnectionAndMinProtoVersions();
void SendComplaint(CDKGPendingMessages& pendingMessages);
bool PreVerifyMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan) const;
void ReceiveMessage(const uint256& hash, const CDKGComplaint& qc, bool& retBan);
Expand Down
Loading

0 comments on commit 839f14d

Please sign in to comment.