diff --git a/src/config/zkresult.cpp b/src/config/zkresult.cpp index eb9532852..6145f9b8a 100644 --- a/src/config/zkresult.cpp +++ b/src/config/zkresult.cpp @@ -87,7 +87,7 @@ struct { ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH, "ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH" }, { ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE" }, { ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE, "ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE" }, - { ZKR_DB_VERSION_NOT_FOUND, "ZKR_DB_VERSION_NOT_FOUND" }, + { ZKR_DB_VERSION_NOT_FOUND_KVDB, "ZKR_DB_VERSION_NOT_FOUND_KBDB" }, { ZKR_DB_VERSION_NOT_FOUND_GLOBAL, "ZKR_DB_VERSION_NOT_FOUND_GLOBAL"} diff --git a/src/config/zkresult.hpp b/src/config/zkresult.hpp index 01ece0449..4294fc59f 100644 --- a/src/config/zkresult.hpp +++ b/src/config/zkresult.hpp @@ -88,7 +88,7 @@ typedef enum : int ZKR_SM_MAIN_MEMALIGN_READ_MISMATCH = 78, // Main state memory align read ROM operation check failed ZKR_SM_MAIN_HASHK_READ_OUT_OF_RANGE = 79, // Main state Keccak hash check found read out of range ZKR_SM_MAIN_HASHP_READ_OUT_OF_RANGE = 80, // Main state Poseidon hash check found read out of range - ZKR_DB_VERSION_NOT_FOUND = 81, // Version not found in KeyValue database + ZKR_DB_VERSION_NOT_FOUND_KVDB = 81, // Version not found in KeyValue database ZKR_DB_VERSION_NOT_FOUND_GLOBAL = 82, // Version not found in KeyValue database and not present in hashDB neither diff --git a/src/hashdb64/database_64.cpp b/src/hashdb64/database_64.cpp index 6ec6554a5..b56b3dc12 100644 --- a/src/hashdb64/database_64.cpp +++ b/src/hashdb64/database_64.cpp @@ -15,6 +15,7 @@ #include "zkmax.hpp" #include "hashdb_remote.hpp" #include "key_value.hpp" +#include "tree_64.hpp" #ifdef DATABASE_USE_CACHE @@ -42,7 +43,8 @@ Database64::Database64 (Goldilocks &fr, const Config &config) : config(config), connectionsPool(NULL), multiWrite(fr), - maxVersions(config.kvDBMaxVersions) + maxVersions(config.kvDBMaxVersions), + maxVersionsUpload(10) //add into config if needed { // Init mutex pthread_mutex_init(&connMutex, NULL); @@ -322,13 +324,13 @@ zkresult Database64::readKV(const Goldilocks::Element (&root)[4], const Goldiloc zkresult rkv = ZKR_UNSPECIFIED; zkresult rv = ZKR_UNSPECIFIED; + zkresult rout = ZKR_UNSPECIFIED; string keyStr = ""; if(dbReadLog->getSaveKeys()){ string keyStr_ = fea2string(fr, key[0], key[1], key[2], key[3]); keyStr = NormalizeToNFormat(keyStr_, 64); } - KeyValue mwEntry; uint64_t version; rv = readVersion(root, version, dbReadLog); @@ -343,69 +345,63 @@ zkresult Database64::readKV(const Goldilocks::Element (&root)[4], const Goldiloc } // If the key is pending to be stored in database, but already deleted from cache - else if (config.dbMultiWrite && multiWrite.findKeyValue(version, mwEntry)) + else if (config.dbMultiWrite && multiWrite.findKeyValue(version, key, value)) { - value = mwEntry.value; // Add to the read log if (dbReadLog != NULL) dbReadLog->add(keyStr, value, true, TimeDiff(t)); - - // Store it locally to avoid any future remote access for this key - dbKVACache.addKeyValueVersion(version, key, value, false); + // We do not store into cache as we do not want to manage the chain of versions rkv = ZKR_SUCCESS; } else if(useRemoteDB) { - - rkv = readRemoteKV(version, key, value); - if ( (rkv != ZKR_SUCCESS) && (config.dbReadRetryDelay > 0) ) - { - if(!dbReadLog->getSaveKeys()){ - string keyStr_ = fea2string(fr, key[0], key[1], key[2], key[3]); - keyStr = NormalizeToNFormat(keyStr_, 64); - } - for (uint64_t i=0; i &upstreamVersionValues) { const string &tableName = config.dbKeyValueTableName; @@ -1103,7 +1073,10 @@ zkresult Database64::readRemoteKV(const uint64_t version, const Goldilocks::Elem zklog.error("DatabaseKV::readRemoteKV() table=" + tableName + " got an invalid number of colums for the row: " + to_string(row.size()) + "for key=" + keyStr); exitProcess(); } - return extractVersion(row[1], version, value); + + // Dispose the read db conneciton + disposeConnection(pDatabaseConnection); + return extractVersion(row[1], version, value, upstreamVersionValues); } @@ -1118,25 +1091,35 @@ zkresult Database64::readRemoteKV(const uint64_t version, const Goldilocks::Elem // Dispose the read db conneciton disposeConnection(pDatabaseConnection); - return ZKR_SUCCESS; + return ZKR_DB_ERROR;//should never return here } -zkresult Database64::extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value){ +zkresult Database64::extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value, vector &upstreamVersionValues){ + upstreamVersionValues.clear(); int data_size = 0; if(!fieldData.is_null()){ string data = removeBSXIfExists64(fieldData.c_str()); data_size = data.size(); zkassert(data_size % 80 == 0); + int nUpstreams = 0; for (int i = 0; i < data_size; i += 80) { + string versionStr = data.substr(i, 16); mpz_class aux(versionStr, 16); uint64_t version_ = aux.get_ui(); - if(version_==version){ + if(nUpstreams < maxVersions){ + VersionValue vv; + vv.version = version_; + vv.value = mpz_class(data.substr(i + 16, 64), 16); + upstreamVersionValues.push_back(vv); + } + if(version >= version_){ value = mpz_class(data.substr(i + 16, 64), 16); return ZKR_SUCCESS; } + ++nUpstreams; } } /*const char * data = fieldData.c_str(); @@ -1158,14 +1141,14 @@ zkresult Database64::extractVersion(const pqxx::field& fieldData, const uint64_t zklog.error("Database64::extractVersion() got an invalid data size: " + to_string(data_size)); exitProcess(); } - return ZKR_DB_VERSION_NOT_FOUND; + return ZKR_DB_VERSION_NOT_FOUND_KVDB; } } -zkresult Database64::writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool noMultiWrite) +zkresult Database64::writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool useMultiWrite) { zkresult result = ZKR_SUCCESS; - if (config.dbMultiWrite && !noMultiWrite) + if (config.dbMultiWrite && useMultiWrite) { multiWrite.Lock(); KeyValue auxKV; @@ -1174,7 +1157,10 @@ zkresult Database64::writeRemoteKV(const uint64_t version, const Goldilocks::Ele auxKV.key[2] = key[2]; auxKV.key[3] = key[3]; auxKV.value = value; - multiWrite.data[multiWrite.pendingToFlushDataIndex].keyValueIntray[version] = auxKV; + multiWrite.data[multiWrite.pendingToFlushDataIndex].keyValueAIntray[version].push_back(auxKV); + string keyStr_ = fea2string(fr, key[0], key[1], key[2], key[3]); + string keyStr = NormalizeToNFormat(keyStr_, 64); + multiWrite.data[multiWrite.pendingToFlushDataIndex].keyVersionsIntray[keyStr].push_back(version); #ifdef LOG_DB_WRITE_REMOTE zklog.info("Database64::writeRemote() version=" + to_string(version) + " multiWrite=[" + multiWrite.print() + "]"); #endif @@ -1215,32 +1201,37 @@ zkresult Database64::writeRemoteKV(const uint64_t version, const Goldilocks::Ele n.commit(); // Process the result - if (rows.size() == 0) - { - disposeConnection(pDatabaseConnection); - return ZKR_DB_KEY_NOT_FOUND; - } - else if (rows.size() > 1) - { - zklog.error("Database64::writeRemoteKV() table=" + tableName + " got more than one row for the same key: " + to_string(rows.size()) + "for key=" + keyStr); - exitProcess(); - } + if(rows.size() > 0){ + if (rows.size() > 1) + { + zklog.error("Database64::writeRemoteKV() table=" + tableName + " got more than one row for the same key: " + to_string(rows.size()) + "for key=" + keyStr); + exitProcess(); + } - const pqxx::row& row = rows[0]; - if (row.size() != 2) - { - zklog.error("DatabaseKV::writeRemoteKV() table=" + tableName + " got an invalid number of colums for the row: " + to_string(row.size()) + "for key=" + keyStr); - exitProcess(); - } - const pqxx::field& fieldData = row[1]; - //processar insert; - if(!fieldData.is_null()){ - string data = removeBSXIfExists64(fieldData.c_str()); - int data_size = data.size(); - if(data_size == maxVersions*80){ - data = data.substr(0, data_size - 80); + const pqxx::row& row = rows[0]; + if (row.size() != 2) + { + zklog.error("DatabaseKV::writeRemoteKV() table=" + tableName + " got an invalid number of colums for the row: " + to_string(row.size()) + "for key=" + keyStr); + exitProcess(); + } + const pqxx::field& fieldData = row[1]; + //processar insert; + if(!fieldData.is_null()){ + string data = removeBSXIfExists64(fieldData.c_str()); + int data_size = data.size(); + if(data_size == maxVersions*80){ + data = data.substr(0, data_size - 80); + } + insertStr = insertStr + data; } - insertStr = insertStr + data; + } else{ + string valueZero = NormalizeToNFormat("0",64); + string versionZero = NormalizeToNFormat(U64toString(0,16),16); + string insertZero = versionStr + valueStr; + insertStr = insertStr + versionZero + valueZero; + mpz_class zero(0); + dbKVACache.downstreamAddKeyZeroVersion(version, key); + } } catch (const std::exception &e) @@ -1974,12 +1965,16 @@ zkresult Database64::sendData (void) } // If there are keyValues add to db - if (data.keyValue.size() > 0) + if (data.keyValueA.size() > 0) { - unordered_map::const_iterator it=data.keyValue.begin(); - while (it != data.keyValue.end()) + map>::const_iterator it=data.keyValueA.begin(); + while (it != data.keyValueA.end()) { - writeRemoteKV(it->first,it->second.key,it->second.value, true); + for(auto it2=it->second.begin(); it2!=it->second.end(); it2++) + { + writeRemoteKV(it->first,it2->key,it2->value, false); + } + ++it; } } // If there are versions add to db diff --git a/src/hashdb64/database_64.hpp b/src/hashdb64/database_64.hpp index 65886726d..f4543370e 100644 --- a/src/hashdb64/database_64.hpp +++ b/src/hashdb64/database_64.hpp @@ -74,6 +74,7 @@ class Database64 pthread_t senderPthread; // Database sender thread pthread_t cacheSynchPthread; // Cache synchronization thread int maxVersions; // Maximum number of versions to store in the database KV + int maxVersionsUpload; // Maximum number of versions to upload from the database KV to the cache when there is a cache miss private: // Remote database based on Postgres (PostgreSQL) @@ -81,14 +82,14 @@ class Database64 zkresult readRemote(bool bProgram, const string &key, string &value); zkresult writeRemote(bool bProgram, const string &key, const string &value); - zkresult readRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class value); - zkresult writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool noMultiWrite = false); + zkresult readRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class value, vector &upstreamVersionValues); + zkresult writeRemoteKV(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool useMultiWrite = true); zkresult readRemoteVersion(const Goldilocks::Element (&root)[4], uint64_t version); zkresult writeRemoteVersion(const Goldilocks::Element (&root)[4], const uint64_t version); zkresult readRemoteLatestVersion(uint64_t &version); zkresult writeRemoteLatestVersion(const uint64_t version); - zkresult extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value); + zkresult extractVersion(const pqxx::field& fieldData, const uint64_t version, mpz_class &value, vector &upstreamVersionValues); public: #ifdef DATABASE_USE_CACHE diff --git a/src/hashdb64/database_kv_associative_cache.cpp b/src/hashdb64/database_kv_associative_cache.cpp index b066bf939..701367dd8 100644 --- a/src/hashdb64/database_kv_associative_cache.cpp +++ b/src/hashdb64/database_kv_associative_cache.cpp @@ -14,7 +14,7 @@ DatabaseKVAssociativeCache::DatabaseKVAssociativeCache() indexesSize = 0; log2CacheSize = 0; cacheSize = 0; - maxVersions = 100; //rick: as parameter + maxVersions = 0; indexes = NULL; keys = NULL; values = NULL; @@ -61,8 +61,7 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac exitProcess(); } cacheSize = 1 << log2CacheSize_; - maxVersions = 100; //rick: as parameter - + maxVersions = cacheSize; if(indexes != NULL) delete[] indexes; indexes = new uint32_t[indexesSize]; @@ -81,6 +80,11 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac if(versions != NULL) delete[] versions; versions = new uint64_t[2 * cacheSize]; + #pragma omp parallel for schedule(static) num_threads(4) + for (size_t i = 0; i < cacheSize; i++) + { + versions[i*2+1] = UINT64_MAX; + } currentCacheIndex = 0; attempts = 0; @@ -92,15 +96,14 @@ void DatabaseKVAssociativeCache::postConstruct(int log2IndexesSize_, int log2Cac indexesMask = indexesSize - 1; }; -void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool update){ +void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value){ lock_guard guard(mlock); bool emptySlot = false; bool present = false; - bool presentSameVersion = false; uint32_t cacheIndex; uint32_t tableIndexUse=0; - uint32_t cacheIndexPrev; + uint64_t cacheIndexPrev=UINT64_MAX; // // Check if present in one of the four slots @@ -114,6 +117,7 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons cacheIndex = cacheIndexRaw & cacheMask; uint32_t cacheIndexKey = cacheIndex * 4; uint32_t cacheIndexVersions = cacheIndex * 2; + uint32_t cacheIndexValue = cacheIndex; if (!emptyCacheSlot(cacheIndexRaw)){ if( keys[cacheIndexKey + 0].fe == key[0].fe && @@ -122,9 +126,16 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons keys[cacheIndexKey + 3].fe == key[3].fe){ present = true; if(versions[cacheIndexVersions] == version){ - presentSameVersion = true; - if(update == false) return; + return; //no update + } else if (version < versions[cacheIndexVersions]){ + zklog.error("DatabaseKVAssociativeCache::addKeyValueVersion() adding lower version than the current one"); + exitProcess(); + } else { + if(value==values[cacheIndexValue]){ + return; + } } + tableIndexUse = tableIndex; cacheIndexPrev = cacheIndex; break; @@ -138,13 +149,12 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons // // Evaluate cacheIndexKey and // - if(!presentSameVersion){ - if(emptySlot == true || present){ - indexes[tableIndexUse] = currentCacheIndex; - } - cacheIndex = (uint32_t)(currentCacheIndex & cacheMask); - currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1); + if(emptySlot == true || present){ + indexes[tableIndexUse] = currentCacheIndex; } + cacheIndex = (uint32_t)(currentCacheIndex & cacheMask); + currentCacheIndex = (currentCacheIndex == UINT32_MAX) ? 0 : (currentCacheIndex + 1); + uint64_t cacheIndexKey, cacheIndexValue, cacheIndexVersions; cacheIndexKey = cacheIndex * 4; cacheIndexValue = cacheIndex; @@ -159,10 +169,10 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons keys[cacheIndexKey + 3].fe = key[3].fe; values[cacheIndexValue] = value; versions[cacheIndexVersions] = version; - if(present & !presentSameVersion){ + if(present){ versions[cacheIndexVersions+1] = cacheIndexPrev; }else{ - versions[cacheIndexVersions+1] = 0; + versions[cacheIndexVersions+1] = UINT64_MAX; } // // Forced index insertion @@ -175,6 +185,148 @@ void DatabaseKVAssociativeCache::addKeyValueVersion(const uint64_t version, cons } } +void DatabaseKVAssociativeCache::downstreamAddKeyZeroVersion(const uint64_t version, const Goldilocks::Element (&key)[4]){ + + lock_guard guard(mlock); + bool presentSameVersion = false; + uint32_t cacheIndexZero = 0; + // + // Check if present in one of the four slots + // + Goldilocks::Element key_hashed[4]; + hashKey(key_hashed, key); + bool breakOuterLoop = false; + for (int i = 0; i < 4; ++i) + { + uint32_t tableIndex = (uint32_t)(key_hashed[i].fe & indexesMask); + uint32_t cacheIndexRaw = indexes[tableIndex]; + uint32_t cacheIndex = cacheIndexRaw & cacheMask; + uint32_t cacheIndexKey = cacheIndex * 4; + uint32_t cacheIndexVersions = cacheIndex * 2; + + if (!emptyCacheSlot(cacheIndexRaw)){ + for(int j=0; j0){ + return; + }else{ + break; + } + } + } + } + if(breakOuterLoop) break; + } + + // + // Evaluate cacheIndexKey and + // + if(presentSameVersion){ + + uint64_t cacheIndexKey = cacheIndexZero * 4; + uint64_t cacheIndexValue = cacheIndexZero; + uint64_t cacheIndexVersions = cacheIndexZero * 2; + + // + // Add value + // + keys[cacheIndexKey + 0].fe = key[0].fe; + keys[cacheIndexKey + 1].fe = key[1].fe; + keys[cacheIndexKey + 2].fe = key[2].fe; + keys[cacheIndexKey + 3].fe = key[3].fe; + values[cacheIndexValue] = 0; + versions[cacheIndexVersions] = 0; + versions[cacheIndexVersions+1] = UINT64_MAX; + } + return; + +} + +void DatabaseKVAssociativeCache::uploadKeyValueVersions(const Goldilocks::Element (&key)[4], vector &versionsValues){ + + lock_guard guard(mlock); + //Get last version for the key + vector versions; + getLastCachedVersions(key, versions, versionsValues.size()); + if(versions.size()==0){ + for (std::vector::reverse_iterator it = versionsValues.rbegin(); it != versionsValues.rend(); ++it) { + addKeyValueVersion(it->version, key, it->value); + } + }else{ + vector::const_iterator it = versionsValues.begin(); + for(vector::const_iterator it2 = versions.begin(); it2 != versions.end(); ++it2, ++it){ + if(*it2 != it->version){ + zklog.error("DatabaseKVAssociativeCache::uploadKeyValueVersions() versions mismatch between cache and db"); + exitProcess(); + } + } + //If vector.zize() < versionsValues.size() we could add some backward versions in the cache but not supported yet + } +} + +void DatabaseKVAssociativeCache::getLastCachedVersions(const Goldilocks::Element (&key)[4], vector &versions, const int maxVersionsOut){ + + versions.clear(); + lock_guard guard(mlock); + // + // Find the value + // + Goldilocks::Element key_hashed[4]; + hashKey(key_hashed, key); + for (int i = 0; i < 4; i++) + { + uint32_t cacheIndexRaw = indexes[key_hashed[i].fe & indexesMask]; + if (emptyCacheSlot(cacheIndexRaw)) continue; + + uint32_t cacheIndex = cacheIndexRaw & cacheMask; + uint32_t cacheIndexKey = cacheIndex * 4; + uint32_t cacheIndexVersions = cacheIndex * 2; + + for(int j=0; j0){ + return; + }else{ + break; + } + } + } + } + return; + +} + void DatabaseKVAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes)[10], int &iters) { uint32_t inputRawCacheIndex = usedRawCacheIndexes[iters]; @@ -191,7 +343,7 @@ void DatabaseKVAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIndexes) // find a slot into my indexes // Goldilocks::Element key_hashed[4]; - hashKey_p(key_hashed, &keys[(inputRawCacheIndex & cacheMask) * 4]); + hashKey(key_hashed, (Goldilocks::Element(&)[4])keys[(inputRawCacheIndex & cacheMask) * 4]); Goldilocks::Element *inputKey = &key_hashed[0]; uint32_t minRawCacheIndex = UINT32_MAX; int pos = -1; @@ -265,16 +417,17 @@ bool DatabaseKVAssociativeCache::findKey( const uint64_t version, const Goldiloc keys[cacheIndexKey + 2].fe == key[2].fe && keys[cacheIndexKey + 3].fe == key[3].fe){ - if( versions[cacheIndexVersions] <= version){ //rick: I assume they are ordered + if( version >= versions[cacheIndexVersions] ){ uint32_t cacheIndexValue = cacheIndex; ++hits; value = values[cacheIndexValue]; return true; } + if(versions[cacheIndexVersions+1]==UINT64_MAX) return false; //No more versions cacheIndex = versions[cacheIndexVersions+1] & cacheMask; cacheIndexKey = cacheIndex * 4; cacheIndexVersions = cacheIndex * 2; - + }else{ if(j>0){ return false; diff --git a/src/hashdb64/database_kv_associative_cache.hpp b/src/hashdb64/database_kv_associative_cache.hpp index 36422d8e0..1fbecd580 100644 --- a/src/hashdb64/database_kv_associative_cache.hpp +++ b/src/hashdb64/database_kv_associative_cache.hpp @@ -7,6 +7,7 @@ #include "zklog.hpp" #include "zkmax.hpp" #include "poseidon_goldilocks.hpp" +#include "version_value.hpp" using namespace std; @@ -42,8 +43,12 @@ class DatabaseKVAssociativeCache ~DatabaseKVAssociativeCache(); void postConstruct(int log2IndexesSize_, int log2CacheSize_, string name_); - void addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value, bool update); + void addKeyValueVersion(const uint64_t version, const Goldilocks::Element (&key)[4], const mpz_class &value); + void downstreamAddKeyZeroVersion(const uint64_t version, const Goldilocks::Element (&key)[4]); + void uploadKeyValueVersions(const Goldilocks::Element (&key)[4], vector &versionsValues); bool findKey( const uint64_t version, const Goldilocks::Element (&key)[4], mpz_class &value); + void getLastCachedVersions(const Goldilocks::Element (&key)[4], vector &versions, const int maxVersions); + inline bool enabled() const { return (log2IndexesSize > 0); }; inline uint32_t getCacheSize() const { return cacheSize; }; @@ -66,16 +71,6 @@ class DatabaseKVAssociativeCache PoseidonGoldilocks pg; pg.hash_seq(keyOut, key_hash_imput); }; - inline void hashKey_p(Goldilocks::Element (&keyOut)[4], const Goldilocks::Element * keyIn) const{ //rick: convertir - Goldilocks::Element key_hash_imput[12]; - for(int i=0; i<4; i++){ - key_hash_imput[i] = keyIn[i]; - key_hash_imput[i+4] = keyIn[i]; - key_hash_imput[i+8] = keyIn[i]; - } - PoseidonGoldilocks pg; - pg.hash_seq(keyOut, key_hash_imput); - }; }; #endif \ No newline at end of file diff --git a/src/hashdb64/database_versions_associtive_cache.cpp b/src/hashdb64/database_versions_associtive_cache.cpp index 9e00bb9ed..a36cb20f7 100644 --- a/src/hashdb64/database_versions_associtive_cache.cpp +++ b/src/hashdb64/database_versions_associtive_cache.cpp @@ -173,7 +173,7 @@ void DatabaseVersionsAssociativeCache::forcedInsertion(uint32_t (&usedRawCacheIn // find a slot into my indexes // Goldilocks::Element key_hashed[4]; - hashKey_p(key_hashed, &keys[(inputRawCacheIndex & cacheMask) * 4]); + hashKey(key_hashed, (Goldilocks::Element(&)[4])keys[(inputRawCacheIndex & cacheMask) * 4]); Goldilocks::Element *inputKey = &key_hashed[0]; uint32_t minRawCacheIndex = UINT32_MAX; int pos = -1; diff --git a/src/hashdb64/database_versions_associtive_cache.hpp b/src/hashdb64/database_versions_associtive_cache.hpp index 6f0f0da01..2d368bceb 100644 --- a/src/hashdb64/database_versions_associtive_cache.hpp +++ b/src/hashdb64/database_versions_associtive_cache.hpp @@ -63,16 +63,6 @@ class DatabaseVersionsAssociativeCache PoseidonGoldilocks pg; pg.hash_seq(keyOut, key_hash_imput); }; - inline void hashKey_p(Goldilocks::Element (&keyOut)[4], const Goldilocks::Element * keyIn) const{ //rick: convertir - Goldilocks::Element key_hash_imput[12]; - for(int i=0; i<4; i++){ - key_hash_imput[i] = keyIn[i]; - key_hash_imput[i+4] = keyIn[i]; - key_hash_imput[i+8] = keyIn[i]; - } - PoseidonGoldilocks pg; - pg.hash_seq(keyOut, key_hash_imput); - }; }; #endif diff --git a/src/hashdb64/multi_write_64.cpp b/src/hashdb64/multi_write_64.cpp index aa96a2f0e..28b8ab8b6 100644 --- a/src/hashdb64/multi_write_64.cpp +++ b/src/hashdb64/multi_write_64.cpp @@ -4,6 +4,19 @@ using namespace std; +uint64_t previousAvailableVersion(const uint64_t versionIn, const vector &versions) +{ + uint64_t versionOut = UINT64_MAX; + for (auto it = versions.begin(); it != versions.end(); ++it) + { + if (*it <= versionIn) + { + versionOut = *it; + } + } + return versionOut; +} + MultiWrite64::MultiWrite64(Goldilocks & fr) : fr(fr), lastFlushId(0), @@ -167,62 +180,96 @@ bool MultiWrite64::findProgram(const string &key, vector &value) return bResult; } -bool MultiWrite64::findKeyValue(const uint64_t version, KeyValue &kv){ +bool MultiWrite64::findKeyValue(const uint64_t version,const Goldilocks::Element (&key)[4], mpz_class &value){ + bool bResult = false; Lock(); + string keyStr_ = fea2string(fr, key[0], key[1], key[2], key[3]); + string keyStr = NormalizeToNFormat(keyStr_, 64); + + unordered_map>::const_iterator it_; - unordered_map::const_iterator it; - - // Search in data[pendingToFlushDataIndex].keyValue + // Search in data[pendingToFlushDataIndex].keyValueAIntray + // Very important to start locking for intray first since in has newever versions if (bResult == false) { - it = data[pendingToFlushDataIndex].keyValue.find(version); - if (it != data[pendingToFlushDataIndex].keyValue.end()) - { - kv = it->second; - bResult = true; + uint64_t versionPrevious = previousAvailableVersion(version, data[pendingToFlushDataIndex].keyVersionsIntray[keyStr]); + if(versionPrevious != UINT64_MAX){ + it_ = data[pendingToFlushDataIndex].keyValueAIntray.find(versionPrevious); + if (it_ != data[pendingToFlushDataIndex].keyValueAIntray.end()) + { + for(auto it2 = it_->second.begin(); it2 != it_->second.end(); ++it2){ + if(it2->key[0]==key[0] && it2->key[1]==key[1] && it2->key[2]==key[2] && it2->key[3]==key[3]){ + value = it2->value; + bResult = true; + break; + } + } -#ifdef LOG_DB_MULTI_WRITE_FIND_NODES - zklog.info("MultiWrite64::findKeyValue() data[pendingToFlushDataIndex].keyValue found version=" + to_string(version) + " key=" + fea2string(fr, it->second.key.fe) + " value=" + it->second.value.get_str()); -#endif + #ifdef LOG_DB_MULTI_WRITE_FIND_NODES + zklog.info("MultiWrite64::findkeyValueAIntray() data[pendingToFlushDataIndex].keyValueAIntray found version=" + to_string(version) + " key=" + keyStr + " value=" + value.get_str()); + #endif + } } } - // Search in data[pendingToFlushDataIndex].nodesIntray + map>::const_iterator it; + + // Search in data[pendingToFlushDataIndex].keyValueA if (bResult == false) { - it = data[pendingToFlushDataIndex].keyValueIntray.find(version); - if (it != data[pendingToFlushDataIndex].keyValueIntray.end()) - { - kv = it->second; - bResult = true; + uint64_t versionPrevious = previousAvailableVersion(version, data[pendingToFlushDataIndex].keyVersions[keyStr]); + if(versionPrevious != UINT64_MAX){ + it = data[pendingToFlushDataIndex].keyValueA.find(versionPrevious); + if (it != data[pendingToFlushDataIndex].keyValueA.end()) + { + for(auto it2 = it->second.begin(); it2 != it->second.end(); ++it2){ + if(it2->key[0]==key[0] && it2->key[1]==key[1] && it2->key[2]==key[2] && it2->key[3]==key[3]){ + value = it2->value; + bResult = true; + break; + } + } -#ifdef LOG_DB_MULTI_WRITE_FIND_NODES - zklog.info("MultiWrite64::findKeyValue() data[pendingToFlushDataIndex].keyValueIntray found version=" + to_string(version) + " key=" + fea2string(fr, it->second.key.fe) + " value=" + it->second.value.get_str()); -#endif + #ifdef LOG_DB_MULTI_WRITE_FIND_NODES + zklog.info("MultiWrite64::findkeyValueA() data[pendingToFlushDataIndex].keyValueA found version=" + to_string(version) + " key=" + keyStr + " value=" + value.get_str()); + #endif + } } } + // If there is still some data pending to be stored on database if (storingFlushId != storedFlushId) { - // Search in data[storingDataIndex].keyValue + + // Search in data[storingDataIndex].keyValueA if (bResult == false) { - it = data[storingDataIndex].keyValue.find(version); - if (it != data[storingDataIndex].keyValue.end()) - { - kv = it->second; - bResult = true; -#ifdef LOG_DB_MULTI_WRITE_FIND_NODES - zklog.info("MultiWrite64::findKeyValue() data[storingDataIndex].keyValueIntray found version=" + to_string(version) + " key=" + fea2string(fr, it->second.key.fe) + " value=" + it->second.value.get_str()); -#endif + uint64_t versionPrevious = previousAvailableVersion(version, data[storingDataIndex].keyVersions[keyStr]); + if(versionPrevious != UINT64_MAX){ + it = data[storingDataIndex].keyValueA.find(versionPrevious); + if (it != data[storingDataIndex].keyValueA.end()) + { + for(auto it2 = it->second.begin(); it2 != it->second.end(); ++it2){ + if(it2->key[0]==key[0] && it2->key[1]==key[1] && it2->key[2]==key[2] && it2->key[3]==key[3]){ + value = it2->value; + bResult = true; + break; + } + } + + #ifdef LOG_DB_MULTI_WRITE_FIND_NODES + zklog.info("MultiWrite64::findkeyValueA() data[storingDataIndex].keyValueA found version=" + to_string(version) + " key=" + keyStr + " value=" + value.get_str()); + #endif + } } } + - // data[storingDataIndex].nodesIntray must be empty - zkassert(data[storingDataIndex].keyValueIntray.size() == 0); + // data[storingDataIndex].keyValueIntray must be empty + zkassert(data[storingDataIndex].keyValueAIntray.size() == 0); } Unlock(); diff --git a/src/hashdb64/multi_write_64.hpp b/src/hashdb64/multi_write_64.hpp index cae40ba37..37c696293 100644 --- a/src/hashdb64/multi_write_64.hpp +++ b/src/hashdb64/multi_write_64.hpp @@ -34,7 +34,7 @@ class MultiWrite64 bool findNode(const string &key, string &value); bool findProgram(const string &key, vector &value); - bool findKeyValue(const uint64_t version, KeyValue &kv); + bool findKeyValue(const uint64_t version,const Goldilocks::Element (&key)[4], mpz_class &value); bool findVersion(const string &key, uint64_t &version); }; diff --git a/src/hashdb64/multi_write_data_64.hpp b/src/hashdb64/multi_write_data_64.hpp index 4936e161a..613e50ef4 100644 --- a/src/hashdb64/multi_write_data_64.hpp +++ b/src/hashdb64/multi_write_data_64.hpp @@ -3,6 +3,7 @@ #include #include +#include #include "definitions.hpp" #include "zklog.hpp" #include "multi_query.hpp" @@ -18,8 +19,10 @@ class MultiWriteData64 unordered_map programIntray; unordered_map nodes; unordered_map nodesIntray; - unordered_map keyValue; - unordered_map keyValueIntray; + map> keyValueA; //keyValue must be inserted to the db in order with respect to version! + unordered_map> keyValueAIntray; + unordered_map> keyVersions; + unordered_map> keyVersionsIntray; unordered_map version; unordered_map versionIntray; uint64_t latestVersion; @@ -38,8 +41,10 @@ class MultiWriteData64 programIntray.clear(); nodes.clear(); nodesIntray.clear(); - keyValue.clear(); - keyValueIntray.clear(); + keyValueA.clear(); + keyValueAIntray.clear(); + keyVersions.clear(); + keyVersionsIntray.clear(); version.clear(); versionIntray.clear(); nodesStateRoot.clear(); @@ -54,8 +59,10 @@ class MultiWriteData64 (program.size() == 0) && (programIntray.size() == 0) && (nodesStateRoot.size() == 0) && - (keyValue.size() == 0) && - (keyValueIntray.size() == 0) && + (keyValueA.size() == 0) && + (keyValueAIntray.size() == 0) && + (keyVersions.size() == 0) && + (keyVersionsIntray.size() == 0) && (version.size() == 0) && (versionIntray.size() == 0); } @@ -84,16 +91,22 @@ class MultiWriteData64 nodes.merge(nodesIntray); nodesIntray.clear(); } - if (keyValueIntray.size() > 0) + if (keyValueAIntray.size() > 0) { #ifdef LOG_DB_ACCEPT_INTRAY if (bSenderCalling) { - zklog.info("MultiWriteData64::acceptIntray() rescuing " + to_string(keyValueIntray.size()) + " keyValue pairs"); + zklog.info("MultiWriteData64::acceptIntray() rescuing " + to_string(keyValueAIntray.size()) + " keyValueA pairs"); } -#endif - keyValue.merge(keyValueIntray); - keyValueIntray.clear(); +#endif + for(auto it = keyValueAIntray.begin(); it != keyValueAIntray.end(); ++it){ + if(keyValueA.find(it->first) == keyValueA.end()){ + keyValueA[it->first] = it->second; + }else{ + keyValueA[it->first].insert(keyValueA[it->first].end(), it->second.begin(), it->second.end()); + } + } + keyValueAIntray.clear(); } if (versionIntray.size() > 0) { @@ -106,6 +119,23 @@ class MultiWriteData64 version.merge(versionIntray); versionIntray.clear(); } + if (keyVersionsIntray.size() > 0) + { +#ifdef LOG_DB_ACCEPT_INTRAY + if (bSenderCalling) + { + zklog.info("MultiWriteData64::acceptIntray() rescuing " + to_string(keyVersionsIntray.size()) + " keyVersions"); + } +#endif + for(auto it = keyVersionsIntray.begin(); it != keyVersionsIntray.end(); ++it){ + if(keyVersions.find(it->first) == keyVersions.end()){ + keyVersions[it->first] = it->second; + }else{ + keyVersions[it->first].insert(keyVersions[it->first].end(), it->second.begin(), it->second.end()); + } + } + keyVersionsIntray.clear(); + } } }; diff --git a/src/hashdb64/version_value.hpp b/src/hashdb64/version_value.hpp new file mode 100644 index 000000000..3eb18b37a --- /dev/null +++ b/src/hashdb64/version_value.hpp @@ -0,0 +1,13 @@ +#ifndef VERSION_VALUE_HPP +#define VERSION_VALUE_HPP + +#include + +class VersionValue +{ +public: + uint64_t version; + mpz_class value; +}; + +#endif \ No newline at end of file diff --git a/test/hashdb/database_associative_cache_test.cpp b/test/hashdb/database_associative_cache_test.cpp index 863ebaacb..0ba33b3e3 100644 --- a/test/hashdb/database_associative_cache_test.cpp +++ b/test/hashdb/database_associative_cache_test.cpp @@ -95,7 +95,7 @@ uint64_t DatabaseAssociativeCacheTest (void) keyScalar = i; scalar2fea(fr, keyScalar, key); valueScalar = (version/2)*NUMBER_OF_DB_CACHE_ADDS + i; - dbKVACache.addKeyValueVersion(version, key, valueScalar,update); + dbKVACache.addKeyValueVersion(version, key, valueScalar); } } for(version=0; version<=10; version++){