Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing issues in key-value database #548

Merged
merged 16 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/config/zkresult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ struct
{ ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH, "ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH" },
{ ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE" },
{ ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE" },
{ ZKR_DB_VERSION_NOT_FOUND, "ZKR_DB_VERSION_NOT_FOUND" },
{ ZKR_DB_VERSION_NOT_FOUND_KVDB, "ZKR_DB_VERSION_NOT_FOUND_KBDB" },
{ ZKR_DB_VERSION_NOT_FOUND_GLOBAL, "ZKR_DB_VERSION_NOT_FOUND_GLOBAL"}


Expand Down
2 changes: 1 addition & 1 deletion src/config/zkresult.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ typedef enum : int
ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH = 78, // Main state memory align read ROM operation check failed
ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE = 79, // Main state Keccak hash check found read out of range
ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE = 80, // Main state Poseidon hash check found read out of range
ZKR_DB_VERSION_NOT_FOUND = 81, // Version not found in KeyValue database
ZKR_DB_VERSION_NOT_FOUND_KVDB = 81, // Version not found in KeyValue database
ZKR_DB_VERSION_NOT_FOUND_GLOBAL = 82, // Version not found in KeyValue database and not present in hashDB neither


Expand Down
223 changes: 109 additions & 114 deletions src/hashdb64/database_64.cpp

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions src/hashdb64/database_64.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,22 @@ class Database64
pthread_t senderPthread; // Database sender thread
pthread_t cacheSynchPthread; // Cache synchronization thread
int maxVersions; // Maximum number of versions to store in the database KV
int maxVersionsUpload; // Maximum number of versions to upload from the database KV to the cache when there is a cache miss

private:
// Remote database based on Postgres (PostgreSQL)
void initRemote(void);
zkresult readRemote(bool bProgram, const string &key, string &value);
zkresult writeRemote(bool bProgram, const string &key, const string &value);

zkresult readRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class value);
zkresult writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool noMultiWrite = false);
zkresult readRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class value, vector<VersionValue> &upstreamVersionValues);
zkresult writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool useMultiWrite = true);
zkresult readRemoteVersion(const Goldilocks::Element (&root)[4], uint64_t version);
zkresult writeRemoteVersion(const Goldilocks::Element (&root)[4], const uint64_t version);
zkresult readRemoteLatestVersion(uint64_t &version);
zkresult writeRemoteLatestVersion(const uint64_t version);

zkresult extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value);
zkresult extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value, vector<VersionValue> &upstreamVersionValues);

public:
#ifdef DATABASE_USE_CACHE
Expand Down
191 changes: 172 additions & 19 deletions src/hashdb64/database_kv_associative_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ DatabaseKVAssociativeCache::DatabaseKVAssociativeCache()
indexesSize = 0;
log2CacheSize = 0;
cacheSize = 0;
maxVersions = 100; //rick: as parameter
maxVersions = 0;
indexes = NULL;
keys = NULL;
values = NULL;
Expand Down Expand Up @@ -61,8 +61,7 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac
exitProcess();
}
cacheSize = 1 << log2CacheSize_;
maxVersions = 100; //rick: as parameter

maxVersions = cacheSize;

if(indexes != NULL) delete[] indexes;
indexes = new uint32_t[indexesSize];
Expand All @@ -81,6 +80,11 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac

if(versions != NULL) delete[] versions;
versions = new uint64_t[2 * cacheSize];
#pragma omp parallel for schedule(static) num_threads(4)
for (size_t i = 0; i < cacheSize; i++)
{
versions[i*2+1] = UINT64_MAX;
}

currentCacheIndex = 0;
attempts = 0;
Expand All @@ -92,15 +96,14 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac
indexesMask = indexesSize - 1;
};

void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool update){
void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value){

lock_guard<recursive_mutex> guard(mlock);
bool emptySlot = false;
bool present = false;
bool presentSameVersion = false;
uint32_t cacheIndex;
uint32_t tableIndexUse=0;
uint32_t cacheIndexPrev;
uint64_t cacheIndexPrev=UINT64_MAX;

//
// Check if present in one of the four slots
Expand All @@ -114,6 +117,7 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey = cacheIndex * 4;
uint32_t cacheIndexVersions = cacheIndex * 2;
uint32_t cacheIndexValue = cacheIndex;

if (!emptyCacheSlot(cacheIndexRaw)){
if( keys[cacheIndexKey + 0].fe == key[0].fe &&
Expand All @@ -122,9 +126,16 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
keys[cacheIndexKey + 3].fe == key[3].fe){
present = true;
if(versions[cacheIndexVersions] == version){
presentSameVersion = true;
if(update == false) return;
return; //no update
} else if (version < versions[cacheIndexVersions]){
zklog.error("DatabaseKVAssociativeCache::addKeyValueVersion() adding lower version than the current one");
exitProcess();
} else {
if(value==values[cacheIndexValue]){
return;
}
}

tableIndexUse = tableIndex;
cacheIndexPrev = cacheIndex;
break;
Expand All @@ -138,13 +149,12 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
//
// Evaluate cacheIndexKey and
//
if(!presentSameVersion){
if(emptySlot == true || present){
indexes[tableIndexUse] = currentCacheIndex;
}
cacheIndex = (uint32_t)(currentCacheIndex & cacheMask);
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);
if(emptySlot == true || present){
indexes[tableIndexUse] = currentCacheIndex;
}
cacheIndex = (uint32_t)(currentCacheIndex & cacheMask);
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);

uint64_t cacheIndexKey, cacheIndexValue, cacheIndexVersions;
cacheIndexKey = cacheIndex * 4;
cacheIndexValue = cacheIndex;
Expand All @@ -159,10 +169,10 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
keys[cacheIndexKey + 3].fe = key[3].fe;
values[cacheIndexValue] = value;
versions[cacheIndexVersions] = version;
if(present & !presentSameVersion){
if(present){
versions[cacheIndexVersions+1] = cacheIndexPrev;
}else{
versions[cacheIndexVersions+1] = 0;
versions[cacheIndexVersions+1] = UINT64_MAX;
}
//
// Forced index insertion
Expand All @@ -175,6 +185,148 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons
}
}

void DatabaseKVAssociativeCache::downstreamAddKeyZeroVersion(const uint64_t version, const Goldilocks::Element (&key)[4]){

lock_guard<recursive_mutex> guard(mlock);
bool presentSameVersion = false;
uint32_t cacheIndexZero = 0;
//
// Check if present in one of the four slots
//
Goldilocks::Element key_hashed[4];
hashKey(key_hashed, key);
bool breakOuterLoop = false;
for (int i = 0; i < 4; ++i)
{
uint32_t tableIndex = (uint32_t)(key_hashed[i].fe & indexesMask);
uint32_t cacheIndexRaw = indexes[tableIndex];
uint32_t cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey = cacheIndex * 4;
uint32_t cacheIndexVersions = cacheIndex * 2;

if (!emptyCacheSlot(cacheIndexRaw)){
for(int j=0; j<maxVersions; j++){
if (keys[cacheIndexKey + 0].fe == key[0].fe &&
keys[cacheIndexKey + 1].fe == key[1].fe &&
keys[cacheIndexKey + 2].fe == key[2].fe &&
keys[cacheIndexKey + 3].fe == key[3].fe){

if( versions[cacheIndexVersions] == version &&
versions[cacheIndexVersions+1] == UINT64_MAX){ //if linked zero can not be added){

presentSameVersion = true;
cacheIndexZero = currentCacheIndex & cacheMask;
currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1);
versions[cacheIndexVersions+1] = cacheIndexZero;
breakOuterLoop = true;
break;
}
if(versions[cacheIndexVersions+1]==UINT64_MAX) return; //No more versions
cacheIndex = versions[cacheIndexVersions+1] & cacheMask;
cacheIndexKey = cacheIndex * 4;
cacheIndexVersions = cacheIndex * 2;

}else{
if(j>0){
return;
}else{
break;
}
}
}
}
if(breakOuterLoop) break;
}

//
// Evaluate cacheIndexKey and
//
if(presentSameVersion){

uint64_t cacheIndexKey = cacheIndexZero * 4;
uint64_t cacheIndexValue = cacheIndexZero;
uint64_t cacheIndexVersions = cacheIndexZero * 2;

//
// Add value
//
keys[cacheIndexKey + 0].fe = key[0].fe;
keys[cacheIndexKey + 1].fe = key[1].fe;
keys[cacheIndexKey + 2].fe = key[2].fe;
keys[cacheIndexKey + 3].fe = key[3].fe;
values[cacheIndexValue] = 0;
versions[cacheIndexVersions] = 0;
versions[cacheIndexVersions+1] = UINT64_MAX;
}
return;

}

void DatabaseKVAssociativeCache::uploadKeyValueVersions(const Goldilocks::Element (&key)[4], vector<VersionValue> &versionsValues){

lock_guard<recursive_mutex> guard(mlock);
//Get last version for the key
vector<uint64_t> versions;
getLastCachedVersions(key, versions, versionsValues.size());
if(versions.size()==0){
for (std::vector<VersionValue>::reverse_iterator it = versionsValues.rbegin(); it != versionsValues.rend(); ++it) {
addKeyValueVersion(it->version, key, it->value);
}
}else{
vector<VersionValue>::const_iterator it = versionsValues.begin();
for(vector<uint64_t>::const_iterator it2 = versions.begin(); it2 != versions.end(); ++it2, ++it){
if(*it2 != it->version){
zklog.error("DatabaseKVAssociativeCache::uploadKeyValueVersions() versions mismatch between cache and db");
exitProcess();
}
}
//If vector.zize() < versionsValues.size() we could add some backward versions in the cache but not supported yet
}
}

void DatabaseKVAssociativeCache::getLastCachedVersions(const Goldilocks::Element (&key)[4], vector<uint64_t> &versions, const int maxVersionsOut){

versions.clear();
lock_guard<recursive_mutex> guard(mlock);
//
// Find the value
//
Goldilocks::Element key_hashed[4];
hashKey(key_hashed, key);
for (int i = 0; i < 4; i++)
{
uint32_t cacheIndexRaw = indexes[key_hashed[i].fe & indexesMask];
if (emptyCacheSlot(cacheIndexRaw)) continue;

uint32_t cacheIndex = cacheIndexRaw & cacheMask;
uint32_t cacheIndexKey = cacheIndex * 4;
uint32_t cacheIndexVersions = cacheIndex * 2;

for(int j=0; j<maxVersionsOut; j++){
if (keys[cacheIndexKey + 0].fe == key[0].fe &&
keys[cacheIndexKey + 1].fe == key[1].fe &&
keys[cacheIndexKey + 2].fe == key[2].fe &&
keys[cacheIndexKey + 3].fe == key[3].fe){

versions.push_back(versions[cacheIndexVersions]);
if(versions[cacheIndexVersions+1]==UINT64_MAX) return; //No more versions
cacheIndex = versions[cacheIndexVersions+1] & cacheMask;
cacheIndexKey = cacheIndex * 4;
cacheIndexVersions = cacheIndex * 2;

}else{
if(j>0){
return;
}else{
break;
}
}
}
}
return;

}

void DatabaseKVAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes)[10], int &iters)
{
uint32_t inputRawCacheIndex = usedRawCacheIndexes[iters];
Expand All @@ -191,7 +343,7 @@ void DatabaseKVAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes)
// find a slot into my indexes
//
Goldilocks::Element key_hashed[4];
hashKey_p(key_hashed, &keys[(inputRawCacheIndex & cacheMask) * 4]);
hashKey(key_hashed, (Goldilocks::Element(&)[4])keys[(inputRawCacheIndex & cacheMask) * 4]);
Goldilocks::Element *inputKey = &key_hashed[0];
uint32_t minRawCacheIndex = UINT32_MAX;
int pos = -1;
Expand Down Expand Up @@ -265,16 +417,17 @@ bool DatabaseKVAssociativeCache::findKey( const uint64_t version, const Goldiloc
keys[cacheIndexKey + 2].fe == key[2].fe &&
keys[cacheIndexKey + 3].fe == key[3].fe){

if( versions[cacheIndexVersions] <= version){ //rick: I assume they are ordered
if( version >= versions[cacheIndexVersions] ){
uint32_t cacheIndexValue = cacheIndex;
++hits;
value = values[cacheIndexValue];
return true;
}
if(versions[cacheIndexVersions+1]==UINT64_MAX) return false; //No more versions
cacheIndex = versions[cacheIndexVersions+1] & cacheMask;
cacheIndexKey = cacheIndex * 4;
cacheIndexVersions = cacheIndex * 2;

}else{
if(j>0){
return false;
Expand Down
17 changes: 6 additions & 11 deletions src/hashdb64/database_kv_associative_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "zklog.hpp"
#include "zkmax.hpp"
#include "poseidon_goldilocks.hpp"
#include "version_value.hpp"


using namespace std;
Expand Down Expand Up @@ -42,8 +43,12 @@ class DatabaseKVAssociativeCache
~DatabaseKVAssociativeCache();
void postConstruct(int log2IndexesSize_, int log2CacheSize_, string name_);

void addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool update);
void addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value);
void downstreamAddKeyZeroVersion(const uint64_t version, const Goldilocks::Element (&key)[4]);
void uploadKeyValueVersions(const Goldilocks::Element (&key)[4], vector<VersionValue> &versionsValues);
bool findKey( const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class &value);
void getLastCachedVersions(const Goldilocks::Element (&key)[4], vector<uint64_t> &versions, const int maxVersions);


inline bool enabled() const { return (log2IndexesSize > 0); };
inline uint32_t getCacheSize() const { return cacheSize; };
Expand All @@ -66,16 +71,6 @@ class DatabaseKVAssociativeCache
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};
inline void hashKey_p(Goldilocks::Element (&keyOut)[4], const Goldilocks::Element * keyIn) const{ //rick: convertir
Goldilocks::Element key_hash_imput[12];
for(int i=0; i<4; i++){
key_hash_imput[i] = keyIn[i];
key_hash_imput[i+4] = keyIn[i];
key_hash_imput[i+8] = keyIn[i];
}
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};

};
#endif
2 changes: 1 addition & 1 deletion src/hashdb64/database_versions_associtive_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void DatabaseVersionsAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIn
// find a slot into my indexes
//
Goldilocks::Element key_hashed[4];
hashKey_p(key_hashed, &keys[(inputRawCacheIndex & cacheMask) * 4]);
hashKey(key_hashed, (Goldilocks::Element(&)[4])keys[(inputRawCacheIndex & cacheMask) * 4]);
Goldilocks::Element *inputKey = &key_hashed[0];
uint32_t minRawCacheIndex = UINT32_MAX;
int pos = -1;
Expand Down
10 changes: 0 additions & 10 deletions src/hashdb64/database_versions_associtive_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ class DatabaseVersionsAssociativeCache
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};
inline void hashKey_p(Goldilocks::Element (&keyOut)[4], const Goldilocks::Element * keyIn) const{ //rick: convertir
Goldilocks::Element key_hash_imput[12];
for(int i=0; i<4; i++){
key_hash_imput[i] = keyIn[i];
key_hash_imput[i+4] = keyIn[i];
key_hash_imput[i+8] = keyIn[i];
}
PoseidonGoldilocks pg;
pg.hash_seq(keyOut, key_hash_imput);
};
};
#endif

Loading