Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batches purge in ConsolidateState() #560

Merged
merged 1 commit into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 192 additions & 114 deletions src/hashdb64/state_manager_64.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ zkresult StateManager64::consolidateState(const string &_virtualStateRoot, const
gettimeofday(&t, NULL);
#endif

TimerStart(STATE_MANAGER_CONSOLIDATE_VIRTUAL_STATE);
TimerStart(STATE_MANAGER_CONSOLIDATE_STATE);

// Format the new state root
string virtualStateRoot = NormalizeToNFormat(_virtualStateRoot, 64);
Expand All @@ -663,151 +663,215 @@ zkresult StateManager64::consolidateState(const string &_virtualStateRoot, const

zkresult zkr;

// Find batch state for this uuid
// If it does not exist, we call db.flush() directly
// Find the batch that ends up with the virtual state root
uint64_t virtualStatePosition = 0;
string oldStateRoot;
unordered_map<string, BatchState64>::iterator it;
for (it = state.begin(); it != state.end(); it++)
for (virtualStatePosition = 0; virtualStatePosition < stateOrder.size(); virtualStatePosition++)
{
it = state.find(stateOrder[virtualStatePosition]);
if (it == state.end())
{
zklog.error("StateManager64::consolidateState() found unmatching state i=" + to_string(virtualStatePosition) + " batchUUI=" + stateOrder[virtualStatePosition]);
TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_STATE);
Unlock();
return ZKR_STATE_MANAGER;
}
if (it->second.newStateRoot == virtualStateRoot)
{
oldStateRoot = it->second.oldStateRoot;
break;
}
}
if (it == state.end())
if (virtualStatePosition == stateOrder.size())
{
zklog.error("StateManager64::consolidateState() found no batch state for virtualStateRoot=" + virtualStateRoot);
TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_VIRTUAL_STATE);
zklog.error("StateManager64::consolidateState() could not find a matching virtual state=" + virtualStateRoot);
TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_STATE);
Unlock();
return ZKR_STATE_MANAGER;
}
BatchState64 &batchState = it->second;

// Find the batch that ends up with the newStateRoot

// Determine the chain of state roots (i.e. previous batches) that leads to newStateRoot

// Delete previous batches that are not part of the chain

// Calculate the real state roots, and write them to the database

// We will store here the latest tx new state root
Goldilocks::Element newRoot[4];

// For all transactions
for (uint64_t tx = 0; tx < batchState.txState.size(); tx++)
for (int64_t i = (int64_t)virtualStatePosition - 1; i >= 0; i--)
{
// Get a reference to the current transaction state
TxState64 &txState = batchState.txState[tx];

// Get old state root for this tx
Goldilocks::Element oldRoot[4];
string2fea(fr, txState.persistence[persistence].oldStateRoot, oldRoot);

// Get the key-values for this tx
vector<KeyValue> keyValues;
unordered_map<string, mpz_class> &dbWrite = txState.persistence[persistence].subState[txState.persistence[persistence].subState.size() - 1].dbWrite;
unordered_map<string, mpz_class>::const_iterator it;
for (it = dbWrite.begin(); it != dbWrite.end(); it++)
// Find the batch state corresponding to this position
it = state.find(stateOrder[i]);
if (it == state.end())
{
KeyValue keyValue;
string2fea(fr, it->first, keyValue.key);
keyValue.value = it->second;
keyValues.emplace_back(keyValue);
}

// Call WriteTree and get the new state root
zkr = tree64.WriteTree(db, oldRoot, keyValues, newRoot, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::consolidateState() failed calling WriteTree zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState WriteTree failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
zklog.error("StateManager64::consolidateState() could not find a matching virtual state=" + stateOrder[i]);
TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_STATE);
Unlock();
return ZKR_STATE_MANAGER;
}

// Save the real state root of this tx
string newRootString = NormalizeToNFormat(fea2string(fr, newRoot), 64);
txState.persistence[persistence].newStateRoot = newRootString;

// Save the old state root of the next tx, if any
if (tx < batchState.txState.size() - 1)
// If this state is part of the state chain, continue searching
if (oldStateRoot == it->second.newStateRoot)
{
batchState.txState[tx+1].persistence[persistence].oldStateRoot = newRootString;
}
// If this is the last tx, then save the new state root of the batch
else
{
batchState.newStateRoot = newRootString;
oldStateRoot = it->second.oldStateRoot;
continue;
}

// Create a new version, i.e. read latest version and increment it
uint64_t version;
zkr = db.readLatestVersion(version);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::consolidateState() failed calling db.readLatestVersion zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState db.createLatestVersion failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
}
version++;
// Otherwise, we must delete this state, and shift the virtual state position
state.erase(it);
stateOrder.erase(stateOrder.begin() + i);
virtualStatePosition--;
}

// Save the key-values
zkr = db.writeKV(version, keyValues, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
// Calculate the real state roots, and write them to the database

// We will store here the latest tx new state root
Goldilocks::Element newRoot[4];

// For all batches in the state root chain

for (uint64_t batch = 0; batch <= virtualStatePosition; batch++)
{
// Find the batch state corresponding to this position
it = state.find(stateOrder[batch]);
if (it == state.end())
{
zklog.error("StateManager64::consolidateState() failed calling db.writeKV zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateStatedb.writeKV failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
zklog.error("StateManager64::consolidateState() could not find a matching virtual state=" + stateOrder[batch]);
TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_STATE);
Unlock();
return ZKR_STATE_MANAGER;
}

// Write the new version, associated with the new root
zkr = db.writeVersion(newRoot, version, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
BatchState64 &batchState = state[stateOrder[batch]];

if (batch > 0)
{
zklog.error("StateManager64::consolidateState() failed calling db.writeVersion zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState db.writeVersion failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
string newStateRootString = NormalizeToNFormat(fea2string(fr, newRoot), 64);
batchState.oldStateRoot = newStateRootString;
if (batchState.txState.size() > 0)
{
batchState.txState[0].persistence[persistence].oldStateRoot = newStateRootString;
}
}

// Write the latest version
zkr = db.writeLatestVersion(version, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
// For all transactions
for (uint64_t tx = 0; tx < batchState.txState.size(); tx++)
{
zklog.error("StateManager64::consolidateState() failed calling db.writeLatestVersion zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
// Get a reference to the current transaction state
TxState64 &txState = batchState.txState[tx];

// Get old state root for this tx
Goldilocks::Element oldRoot[4];
string2fea(fr, txState.persistence[persistence].oldStateRoot, oldRoot);

// Get the key-values for this tx
vector<KeyValue> keyValues;
unordered_map<string, mpz_class> &dbWrite = txState.persistence[persistence].subState[txState.persistence[persistence].subState.size() - 1].dbWrite;
unordered_map<string, mpz_class>::const_iterator it;
for (it = dbWrite.begin(); it != dbWrite.end(); it++)
{
KeyValue keyValue;
string2fea(fr, it->first, keyValue.key);
keyValue.value = it->second;
keyValues.emplace_back(keyValue);
}

// Call WriteTree and get the new state root
zkr = tree64.WriteTree(db, oldRoot, keyValues, newRoot, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::consolidateState() failed calling WriteTree zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState WriteTree failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
}

// Save the real state root of this tx
string newRootString = NormalizeToNFormat(fea2string(fr, newRoot), 64);
txState.persistence[persistence].newStateRoot = newRootString;

// Save the old state root of the next tx, if any
if (tx < batchState.txState.size() - 1)
{
batchState.txState[tx+1].persistence[persistence].oldStateRoot = newRootString;
}
// If this is the last tx, then save the new state root of the batch
else
{
batchState.newStateRoot = newRootString;
}

// Create a new version, i.e. read latest version and increment it
uint64_t version;
zkr = db.readLatestVersion(version);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::consolidateState() failed calling db.readLatestVersion zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState db.createLatestVersion failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
}
version++;

// Save the key-values
zkr = db.writeKV(version, keyValues, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::consolidateState() failed calling db.writeKV zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateStatedb.writeKV failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
}

// Write the new version, associated with the new root
zkr = db.writeVersion(newRoot, version, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::consolidateState() failed calling db.writeVersion zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState db.writeVersion failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
}

// Write the latest version
zkr = db.writeLatestVersion(version, persistence == PERSISTENCE_DATABASE ? true : false);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::consolidateState() failed calling db.writeLatestVersion zkr=" + zkresult2string(zkr) +
" tx=" + to_string(tx) +
" txState.oldStateRoot=" + txState.persistence[persistence].oldStateRoot);
#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState db.writeLatestVersion failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
}

} // For all transactions

#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState db.writeLatestVersion failed", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
batchState.timeMetricStorage.add("consolidateState success", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
Unlock();
return ZKR_STATE_MANAGER;
}

} // For all transactions
} // For all batches

// Return the consolidated state root
consolidatedStateRoot = NormalizeToNFormat(fea2string(fr, newRoot), 64);
Expand All @@ -820,14 +884,28 @@ zkresult StateManager64::consolidateState(const string &_virtualStateRoot, const
zklog.error("StateManager64::consolidateState() failed calling db.flush() result=" + zkresult2string(zkr));
}

#ifdef LOG_TIME_STATISTICS_STATE_MANAGER
batchState.timeMetricStorage.add("consolidateState success", TimeDiff(t));
batchState.timeMetricStorage.print("State Manager calls");
#endif
// Delete all batches of the virtual state chain

for (int64_t i = (int64_t)virtualStatePosition; i >= 0; i--)
{
// Find the batch state corresponding to this position
it = state.find(stateOrder[i]);
if (it == state.end())
{
zklog.error("StateManager64::consolidateState() could not find a matching virtual state=" + stateOrder[i]);
TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_STATE);
Unlock();
return ZKR_STATE_MANAGER;
}

// Delete this state, and shift the virtual state position
state.erase(it);
stateOrder.erase(stateOrder.begin() + i);
}

Unlock();

TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_VIRTUAL_STATE);
TimerStopAndLog(STATE_MANAGER_CONSOLIDATE_STATE);

return zkr;
}
Expand Down
Loading