Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize guts / indexes loading for faster startup #85

Merged
merged 7 commits into from
Nov 8, 2024
Merged
110 changes: 22 additions & 88 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5743,86 +5743,6 @@ static bool IsSuperMajority(int minVersion, const CBlockIndex* pstart, unsigned
return (nFound >= nRequired);
}

CBlockIndex *komodo_ensure(CBlock *pblock, uint256 hash)
{
CBlockIndex *pindex = 0;
BlockMap::iterator miSelf = mapBlockIndex.find(hash);
if ( miSelf != mapBlockIndex.end() )
{
if ( (pindex = miSelf->second) == 0 ) // create pindex so first Accept block doesnt fail
{
miSelf->second = AddToBlockIndex(*pblock);
//LogPrintf("Block header %s is already known, but without pindex -> ensured %p\n",hash.ToString().c_str(),miSelf->second);
}
/*if ( hash != Params().GetConsensus().hashGenesisBlock )
{
miSelf = mapBlockIndex.find(pblock->hashPrevBlock);
if ( miSelf != mapBlockIndex.end() )
{
if ( miSelf->second == 0 )
{
miSelf->second = InsertBlockIndex(pblock->hashPrevBlock);
LogPrintf("autocreate previndex %s\n",pblock->hashPrevBlock.ToString().c_str());
}
}
}*/
}
return(pindex);
}

CBlockIndex *oldkomodo_ensure(CBlock *pblock, uint256 hash)
{
CBlockIndex *pindex=0,*previndex=0;
if ( (pindex = komodo_getblockindex(hash)) == 0 )
{
pindex = new CBlockIndex();
if (!pindex)
throw runtime_error("komodo_ensure: new CBlockIndex failed");
BlockMap::iterator mi = mapBlockIndex.insert(make_pair(hash, pindex)).first;
pindex->phashBlock = &((*mi).first);
}
BlockMap::iterator miSelf = mapBlockIndex.find(hash);
if ( miSelf == mapBlockIndex.end() )
{
LogPrintf("komodo_ensure unexpected missing hash %s\n",hash.ToString().c_str());
return(0);
}
if ( miSelf->second == 0 ) // create pindex so first Accept block doesnt fail
{
if ( pindex == 0 )
{
pindex = AddToBlockIndex(*pblock);
LogPrintf("ensure call addtoblockindex, got %p\n",pindex);
}
if ( pindex != 0 )
{
miSelf->second = pindex;
LogPrintf("Block header %s is already known, but without pindex -> ensured %p\n",hash.ToString().c_str(),miSelf->second);
} else LogPrintf("komodo_ensure unexpected null pindex\n");
}
/*if ( hash != Params().GetConsensus().hashGenesisBlock )
{
miSelf = mapBlockIndex.find(pblock->hashPrevBlock);
if ( miSelf == mapBlockIndex.end() )
previndex = InsertBlockIndex(pblock->hashPrevBlock);
if ( (miSelf= mapBlockIndex.find(pblock->hashPrevBlock)) != mapBlockIndex.end() )
{
if ( miSelf->second == 0 ) // create pindex so first Accept block doesnt fail
{
if ( previndex == 0 )
previndex = InsertBlockIndex(pblock->hashPrevBlock);
if ( previndex != 0 )
{
miSelf->second = previndex;
LogPrintf("autocreate previndex %s\n",pblock->hashPrevBlock.ToString().c_str());
} else LogPrintf("komodo_ensure unexpected null previndex\n");
}
} else LogPrintf("komodo_ensure unexpected null miprev\n");
}
}*/
return(pindex);
}

/*****
* @brief Process a new block
* @note can come from the network or locally mined
Expand Down Expand Up @@ -6169,8 +6089,14 @@ bool static LoadBlockIndexDB()
{
const CChainParams& chainparams = Params();
LogPrintf("%s: start loading guts\n", __func__);
if (!pblocktree->LoadBlockIndexGuts())
return false;
if (GetBoolArg("-fastguts", false)) {
/* experimental: faster load, but x2 memory consumption */
if (!pblocktree->LoadBlockIndexGutsFast())
return false;
} else {
if (!pblocktree->LoadBlockIndexGuts())
return false;
}
LogPrintf("%s: loaded guts\n", __func__);
boost::this_thread::interruption_point();

Expand All @@ -6180,7 +6106,7 @@ bool static LoadBlockIndexDB()
// Calculate nChainWork
vector<pair<int, CBlockIndex*> > vSortedByHeight;
vSortedByHeight.reserve(mapBlockIndex.size());
BOOST_FOREACH(const PAIRTYPE(uint256, CBlockIndex*)& item, mapBlockIndex)
for (const std::pair<uint256, CBlockIndex*>& item : mapBlockIndex)
{
CBlockIndex* pindex = item.second;
vSortedByHeight.push_back(make_pair(pindex->nHeight, pindex));
Expand All @@ -6193,9 +6119,12 @@ bool static LoadBlockIndexDB()
uiInterface.ShowProgress(_("Loading block index DB..."), 0, false);
int cur_height_num = 0;

BOOST_FOREACH(const PAIRTYPE(int, CBlockIndex*)& item, vSortedByHeight)
const size_t totalBlocks = vSortedByHeight.size();
const size_t updateInterval = totalBlocks / 100;

for (const std::pair<int, CBlockIndex*>& item : vSortedByHeight)
{
boost::this_thread::interruption_point();
if (ShutdownRequested()) return false;

CBlockIndex* pindex = item.second;
pindex->nChainWork = (pindex->pprev ? pindex->pprev->nChainWork : 0) + GetBlockProof(*pindex);
Expand Down Expand Up @@ -6275,7 +6204,12 @@ bool static LoadBlockIndexDB()
if (pindex->IsValid(BLOCK_VALID_TREE) && (pindexBestHeader == NULL || CBlockIndexWorkComparator()(pindexBestHeader, pindex)))
pindexBestHeader = pindex;
//komodo_pindex_init(pindex,(int32_t)pindex->nHeight);
uiInterface.ShowProgress(_("Loading block index DB..."), (int)((double)(cur_height_num*100)/(double)(vSortedByHeight.size())), false);
if (cur_height_num % updateInterval == 0 || cur_height_num == totalBlocks - 1)
{
int progress = static_cast<int>((static_cast<double>(cur_height_num) / totalBlocks) * 100);
uiInterface.ShowProgress(_("Loading block index DB..."), progress, false);
// uiInterface.ShowProgress(_("Loading block index DB..."), (int)((double)(cur_height_num*100)/(double)(vSortedByHeight.size())), false);
}
cur_height_num++;
}

Expand Down Expand Up @@ -6304,7 +6238,7 @@ bool static LoadBlockIndexDB()
// Check presence of blk files
LogPrintf("Checking all blk files are present...\n");
set<int> setBlkDataFiles;
BOOST_FOREACH(const PAIRTYPE(uint256, CBlockIndex*)& item, mapBlockIndex)
for (const std::pair<uint256, CBlockIndex*>& item : mapBlockIndex)
{
CBlockIndex* pindex = item.second;
if (pindex->nStatus & BLOCK_HAVE_DATA) {
Expand Down Expand Up @@ -6359,7 +6293,7 @@ bool static LoadBlockIndexDB()
LogPrintf("%s: spent index %s\n", __func__, fSpentIndex ? "enabled" : "disabled");

// Fill in-memory data
BOOST_FOREACH(const PAIRTYPE(uint256, CBlockIndex*)& item, mapBlockIndex)
for (const std::pair<uint256, CBlockIndex*>& item : mapBlockIndex)
{
CBlockIndex* pindex = item.second;
// - This relationship will always be true even if pprev has multiple
Expand Down
2 changes: 2 additions & 0 deletions src/main.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ static const unsigned int UNDOFILE_CHUNK_SIZE = 0x100000; // 1 MiB
static const int MAX_SCRIPTCHECK_THREADS = 16;
/** -par default (number of script-checking threads, 0 = auto) */
static const int DEFAULT_SCRIPTCHECK_THREADS = 0;
/** Maximum of loading-guts thread pool threads allowed */
static const int MAX_LOADING_GUTS_THREADS = 4;
/** Number of blocks that can be requested at any given time from a single peer. */
static const int MAX_BLOCKS_IN_TRANSIT_PER_PEER = 16;
/** Timeout in seconds during which a peer must stall block download progress before being disconnected. */
Expand Down
156 changes: 156 additions & 0 deletions src/txdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@

#include <boost/thread.hpp>

#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>

using namespace std;

// NOTE: Per issue #3277, do not use the prefix 'X' or 'x' as they were
Expand Down Expand Up @@ -804,3 +810,153 @@ bool CBlockTreeDB::LoadBlockIndexGuts()

return true;
}

bool CBlockTreeDB::LoadBlockIndexGutsFast()
{
std::unique_ptr<CDBIterator> pcursor(NewIterator());

pcursor->Seek(make_pair(DB_BLOCK_INDEX, uint256()));
int reportDone = 0;
uiInterface.ShowProgress(_("Loading guts..."), 0, false);

// Local synchronization primitives and shared resources
std::mutex mapBlockIndex_mutex;
std::mutex queue_mutex;
std::condition_variable queue_cond_var;
std::queue<CDiskBlockIndex> task_queue;
std::atomic<bool> done(false);

// Lambda for worker threads
auto lblkIdx_worker = [&mapBlockIndex_mutex, &queue_mutex, &queue_cond_var, &task_queue, &done]() {
while (true) {
CDiskBlockIndex diskindex;
{
std::unique_lock<std::mutex> lock(queue_mutex);
queue_cond_var.wait(lock, [&task_queue, &done]() { return !task_queue.empty() || done.load(); });

if (done.load() && task_queue.empty())
return;

diskindex = task_queue.front();
task_queue.pop();
}

std::lock_guard<std::mutex> map_lock(mapBlockIndex_mutex);
// Process diskindex
CBlockIndex* pindexNew = InsertBlockIndex(diskindex.GetBlockHash());
pindexNew->pprev = InsertBlockIndex(diskindex.hashPrev);
pindexNew->nHeight = diskindex.nHeight;
pindexNew->nFile = diskindex.nFile;
pindexNew->nDataPos = diskindex.nDataPos;
pindexNew->nUndoPos = diskindex.nUndoPos;
pindexNew->hashSproutAnchor = diskindex.hashSproutAnchor;
pindexNew->nVersion = diskindex.nVersion;
pindexNew->hashMerkleRoot = diskindex.hashMerkleRoot;
pindexNew->hashFinalSaplingRoot = diskindex.hashFinalSaplingRoot;
pindexNew->nTime = diskindex.nTime;
pindexNew->nBits = diskindex.nBits;
pindexNew->nNonce = diskindex.nNonce;
// the Equihash solution will be loaded lazily from the dbindex entry
pindexNew->nStatus = diskindex.nStatus;
pindexNew->nCachedBranchId = diskindex.nCachedBranchId;
pindexNew->nTx = diskindex.nTx;
pindexNew->nChainSupplyDelta = diskindex.nChainSupplyDelta;
pindexNew->nTransparentValue = diskindex.nTransparentValue;
pindexNew->nBurnedAmountDelta = diskindex.nBurnedAmountDelta;
pindexNew->nSproutValue = diskindex.nSproutValue;
pindexNew->nSaplingValue = diskindex.nSaplingValue;
pindexNew->segid = diskindex.segid;
pindexNew->nNotaryPay = diskindex.nNotaryPay;
}
};

// Thread pool setup
const int num_threads = std::min((unsigned int)MAX_LOADING_GUTS_THREADS, std::thread::hardware_concurrency());
std::vector<std::thread> workers;
for (int i = 0; i < num_threads; ++i)
{
workers.emplace_back(lblkIdx_worker);
}

int64_t count = 0;
// Load mapBlockIndex
while (pcursor->Valid())
{
boost::this_thread::interruption_point();
if (ShutdownRequested())
{
done.store(true);
queue_cond_var.notify_all();
for (auto &thread : workers)
{
thread.join();
}
return false;
}

std::pair<char, uint256> key;
if (pcursor->GetKey(key) && key.first == DB_BLOCK_INDEX)
{

if (count++ % 1000 == 0)
{
uint32_t high = 0x100 * *key.second.begin() + *(key.second.begin() + 1);
int percentageDone = static_cast<int>(high * 100.0 / 65536.0 + 0.5);
uiInterface.ShowProgress(_("Loading guts..."), percentageDone, false);
if (reportDone < percentageDone / 10)
{
// report max. every 10% step
LogPrintf("[%d%%]...", percentageDone); /* Continued */
reportDone = percentageDone / 10;
}
}

CDiskBlockIndex diskindex;
if (pcursor->GetValue(diskindex))
{
{
std::lock_guard<std::mutex> lock(queue_mutex);
task_queue.push(diskindex);
}
queue_cond_var.notify_one();

pcursor->Next();
}
else
{
done.store(true);
queue_cond_var.notify_all();
for (auto &thread : workers)
{
thread.join();
}
return error("LoadBlockIndex() : failed to read value");
}
}
else
{
break;
}
}

// Indicate that loading is done
done.store(true);
queue_cond_var.notify_all();

// Wait for all threads to finish
for (auto &thread : workers)
{
thread.join();
}

uiInterface.ShowProgress("", 100, false);
LogPrintf("[%s].\n", ShutdownRequested() ? "CANCELLED" : "DONE");

std::queue<CDiskBlockIndex> empty;
{
std::lock_guard<std::mutex> lock(queue_mutex);
std::swap(task_queue, empty);
}

return true;
}
1 change: 1 addition & 0 deletions src/txdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ class CBlockTreeDB : public CDBWrapper
* @returns true on success
*/
bool LoadBlockIndexGuts();
bool LoadBlockIndexGutsFast();
/****
* Check if a block is on the active chain
* @param hash the block hash
Expand Down