Skip to content

Commit

Permalink
fix:cache bug && add cache test (OpenAtomFoundation#2225)
Browse files Browse the repository at this point in the history
* fix:cache bug

* add cache test

* fix hash

* fix

* fix init ttl

* fix init

* change lock

* fix mget with ttl

* fix mget && add cache test

* delete log

* fix bitpop

* fix flag

* fix bitpop

---------

Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin authored Dec 25, 2023
1 parent 55e5fd6 commit cb6256c
Show file tree
Hide file tree
Showing 18 changed files with 578 additions and 109 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -632,9 +632,9 @@ ExternalProject_Add(rocksdb

ExternalProject_Add(rediscache
URL
https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.5.tar.gz
https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.7.tar.gz
URL_HASH
MD5=99e4d0bde20811a6058a6aa482c18711
MD5=02c8aadc018dd8d4d3803cc420d1d75b
DOWNLOAD_NO_PROGRESS
1
UPDATE_COMMAND
Expand Down
2 changes: 2 additions & 0 deletions include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<

rocksdb::Status Init(uint32_t cache_num, cache::CacheConfig *cache_cfg);
rocksdb::Status Reset(uint32_t cache_num, cache::CacheConfig *cache_cfg = nullptr);
std::map<storage::DataType, int64_t> TTL(std::string &key, std::map<storage::DataType, rocksdb::Status>* type_status);
void ResetConfig(cache::CacheConfig *cache_cfg);
void Destroy(void);
void SetCacheStatus(int status);
Expand All @@ -70,6 +71,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
rocksdb::Status Persist(std::string &key);
rocksdb::Status Type(std::string &key, std::string *value);
rocksdb::Status RandomKey(std::string *key);
rocksdb::Status GetType(const std::string& key, bool single, std::vector<std::string>& types);

// String Commands
rocksdb::Status Set(std::string &key, std::string &value, int64_t ttl);
Expand Down
1 change: 0 additions & 1 deletion include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ class ExistsCmd : public Cmd {
std::vector<std::string> keys_;
int64_t split_res_ = 0;
void DoInitial() override;
rocksdb::Status s_;
};

class ExpireCmd : public Cmd {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3011,7 +3011,7 @@ void DisableWalCmd::Do(std::shared_ptr<Slot> slot) {

void CacheCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameDisableWal);
res_.SetRes(CmdRes::kWrongNum, kCmdNameCache);
return;
}
if (!strcasecmp(argv_[1].data(), "clear")) {
Expand Down
128 changes: 128 additions & 0 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,63 @@ Status PikaCache::TTL(std::string &key, int64_t *ttl) {
return caches_[cache_index]->TTL(key, ttl);
}

std::map<storage::DataType, int64_t> PikaCache::TTL(std::string &key, std::map<storage::DataType, Status>* type_status) {
Status s;
std::map<storage::DataType, int64_t> ret;
int64_t timestamp = 0;

std::string CacheKeyPrefixK = PCacheKeyPrefixK + key;
int cache_indexk = CacheIndex(CacheKeyPrefixK);
s = caches_[cache_indexk]->TTL(CacheKeyPrefixK, &timestamp);
if (s.ok() || s.IsNotFound()) {
ret[storage::DataType::kStrings] = timestamp;
} else if (!s.IsNotFound()) {
ret[storage::DataType::kStrings] = -3;
(*type_status)[storage::DataType::kStrings] = s;
}

std::string CacheKeyPrefixH = PCacheKeyPrefixH + key;
int cache_indexh = CacheIndex(CacheKeyPrefixH);
s = caches_[cache_indexh]->TTL(CacheKeyPrefixH, &timestamp);
if (s.ok() || s.IsNotFound()) {
ret[storage::DataType::kHashes] = timestamp;
} else if (!s.IsNotFound()) {
ret[storage::DataType::kHashes] = -3;
(*type_status)[storage::DataType::kHashes] = s;
}

std::string CacheKeyPrefixL = PCacheKeyPrefixL + key;
int cache_indexl = CacheIndex(CacheKeyPrefixL);
s = caches_[cache_indexl]->TTL(CacheKeyPrefixL, &timestamp);
if (s.ok() || s.IsNotFound()) {
ret[storage::DataType::kLists] = timestamp;
} else if (!s.IsNotFound()) {
ret[storage::DataType::kLists] = -3;
(*type_status)[storage::DataType::kLists] = s;
}

std::string CacheKeyPrefixS = PCacheKeyPrefixS + key;
int cache_indexs = CacheIndex(CacheKeyPrefixS);
s = caches_[cache_indexs]->TTL(CacheKeyPrefixS, &timestamp);
if (s.ok() || s.IsNotFound()) {
ret[storage::DataType::kSets] = timestamp;
} else if (!s.IsNotFound()) {
ret[storage::DataType::kSets] = -3;
(*type_status)[storage::DataType::kSets] = s;
}

std::string CacheKeyPrefixZ = PCacheKeyPrefixZ + key;
int cache_indexz = CacheIndex(CacheKeyPrefixZ);
s = caches_[cache_indexz]->TTL(CacheKeyPrefixZ, &timestamp);
if (s.ok() || s.IsNotFound()) {
ret[storage::DataType::kZSets] = timestamp;
} else if (!s.IsNotFound()) {
ret[storage::DataType::kZSets] = -3;
(*type_status)[storage::DataType::kZSets] = s;
}
return ret;
}

Status PikaCache::Persist(std::string &key) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
Expand Down Expand Up @@ -164,6 +221,77 @@ Status PikaCache::RandomKey(std::string *key) {
return s;
}

Status PikaCache::GetType(const std::string& key, bool single, std::vector<std::string>& types) {
types.clear();

Status s;
std::string value;
std::string CacheKeyPrefixK = PCacheKeyPrefixK + key;
int cache_indexk = CacheIndex(CacheKeyPrefixK);
s = caches_[cache_indexk]->Get(CacheKeyPrefixK, &value);
if (s.ok()) {
types.emplace_back("string");
} else if (!s.IsNotFound()) {
return s;
}
if (single && !types.empty()) {
return s;
}

uint64_t hashes_len = 0;
std::string CacheKeyPrefixH = PCacheKeyPrefixH + key;
int cache_indexh = CacheIndex(CacheKeyPrefixH);
s = caches_[cache_indexh]->HLen(CacheKeyPrefixH, &hashes_len);
if (s.ok() && hashes_len != 0) {
types.emplace_back("hash");
} else if (!s.IsNotFound()) {
return s;
}
if (single && !types.empty()) {
return s;
}

uint64_t lists_len = 0;
std::string CacheKeyPrefixL = PCacheKeyPrefixL + key;
int cache_indexl = CacheIndex(CacheKeyPrefixL);
s = caches_[cache_indexl]->LLen(CacheKeyPrefixL, &lists_len);
if (s.ok() && lists_len != 0) {
types.emplace_back("list");
} else if (!s.IsNotFound()) {
return s;
}
if (single && !types.empty()) {
return s;
}

uint64_t zsets_size = 0;
std::string CacheKeyPrefixZ = PCacheKeyPrefixZ + key;
int cache_indexz = CacheIndex(CacheKeyPrefixZ);
s = caches_[cache_indexz]->ZCard(CacheKeyPrefixZ, &zsets_size);
if (s.ok() && zsets_size != 0) {
types.emplace_back("zset");
} else if (!s.IsNotFound()) {
return s;
}
if (single && !types.empty()) {
return s;
}

uint64_t sets_size = 0;
std::string CacheKeyPrefixS = PCacheKeyPrefixS + key;
int cache_indexs = CacheIndex(CacheKeyPrefixS);
s = caches_[cache_indexs]->SCard(CacheKeyPrefixS, &sets_size);
if (s.ok() && sets_size != 0) {
types.emplace_back("set");
} else if (!s.IsNotFound()) {
return s;
}
if (single && types.empty()) {
types.emplace_back("none");
}
return Status::OK();
}

/*-----------------------------------------------------------------------------
* String Commands
*----------------------------------------------------------------------------*/
Expand Down
2 changes: 1 addition & 1 deletion src/pika_cache_load_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ bool PikaCacheLoadThread::LoadZset(std::string &key, const std::shared_ptr<Slot>
}

bool PikaCacheLoadThread::LoadKey(const char key_type, std::string &key, const std::shared_ptr<Slot>& slot) {
pstd::lock::MultiRecordLock record_lock(slot->LockMgr());
pstd::lock::ScopeRecordLock record_lock(slot->LockMgr(), key);
switch (key_type) {
case 'k':
return LoadKV(key, slot);
Expand Down
33 changes: 17 additions & 16 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSetnx, std::move(setnxptr)));
////SetexCmd
std::unique_ptr<Cmd> setexptr =
std::make_unique<SetexCmd>(kCmdNameSetex, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
std::make_unique<SetexCmd>(kCmdNameSetex, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSetex, std::move(setexptr)));
////PsetexCmd
std::unique_ptr<Cmd> psetexptr =
std::make_unique<PsetexCmd>(kCmdNamePsetex, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv);
std::make_unique<PsetexCmd>(kCmdNamePsetex, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePsetex, std::move(psetexptr)));
////DelvxCmd
std::unique_ptr<Cmd> delvxptr =
Expand Down Expand Up @@ -281,11 +281,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePexpire, std::move(pexpireptr)));
////ExpireatCmd
std::unique_ptr<Cmd> expireatptr =
std::make_unique<ExpireatCmd>(kCmdNameExpireat, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv);
std::make_unique<ExpireatCmd>(kCmdNameExpireat, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameExpireat, std::move(expireatptr)));
////PexpireatCmd
std::unique_ptr<Cmd> pexpireatptr =
std::make_unique<PexpireatCmd>(kCmdNamePexpireat, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
std::make_unique<PexpireatCmd>(kCmdNamePexpireat, 3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsKv | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePexpireat, std::move(pexpireatptr)));
////TtlCmd
std::unique_ptr<Cmd> ttlptr =
Expand Down Expand Up @@ -343,7 +343,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGet, std::move(hgetptr)));
////HGetallCmd
std::unique_ptr<Cmd> hgetallptr =
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGetall, std::move(hgetallptr)));
////HExistsCmd
std::unique_ptr<Cmd> hexistsptr =
Expand Down Expand Up @@ -383,7 +383,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHStrlen, std::move(hstrlenptr)));
////HValsCmd
std::unique_ptr<Cmd> hvalsptr =
std::make_unique<HValsCmd>(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
std::make_unique<HValsCmd>(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHVals, std::move(hvalsptr)));
////HScanCmd
std::unique_ptr<Cmd> hscanptr =
Expand Down Expand Up @@ -482,11 +482,11 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRevrange, std::move(zrevrangeptr)));
////ZRangebyscoreCmd
std::unique_ptr<Cmd> zrangebyscoreptr = std::make_unique<ZRangebyscoreCmd>(
kCmdNameZRangebyscore, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset| kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
kCmdNameZRangebyscore, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRangebyscore, std::move(zrangebyscoreptr)));
////ZRevrangebyscoreCmd
std::unique_ptr<Cmd> zrevrangebyscoreptr = std::make_unique<ZRevrangebyscoreCmd>(
kCmdNameZRevrangebyscore, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset| kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
kCmdNameZRevrangebyscore, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset);
cmd_table->insert(
std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRevrangebyscore, std::move(zrevrangebyscoreptr)));
////ZCountCmd
Expand Down Expand Up @@ -519,15 +519,15 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZScore, std::move(zscoreptr)));
////ZRangebylexCmd
std::unique_ptr<Cmd> zrangebylexptr =
std::make_unique<ZRangebylexCmd>(kCmdNameZRangebylex, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
std::make_unique<ZRangebylexCmd>(kCmdNameZRangebylex, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRangebylex, std::move(zrangebylexptr)));
////ZRevrangebylexCmd
std::unique_ptr<Cmd> zrevrangebylexptr = std::make_unique<ZRevrangebylexCmd>(
kCmdNameZRevrangebylex, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
kCmdNameZRevrangebylex, -4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZRevrangebylex, std::move(zrevrangebylexptr)));
////ZLexcountCmd
std::unique_ptr<Cmd> zlexcountptr =
std::make_unique<ZLexcountCmd>(kCmdNameZLexcount, 4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset| kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
std::make_unique<ZLexcountCmd>(kCmdNameZLexcount, 4, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsZset);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameZLexcount, std::move(zlexcountptr)));
////ZRemrangebyrankCmd
std::unique_ptr<Cmd> zremrangebyrankptr = std::make_unique<ZRemrangebyrankCmd>(
Expand Down Expand Up @@ -617,23 +617,23 @@ void InitCmdTable(CmdTable* cmd_table) {
// BitMap
////bitsetCmd
std::unique_ptr<Cmd> bitsetptr =
std::make_unique<BitSetCmd>(kCmdNameBitSet, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsBit | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
std::make_unique<BitSetCmd>(kCmdNameBitSet, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsBit);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitSet, std::move(bitsetptr)));
////bitgetCmd
std::unique_ptr<Cmd> bitgetptr =
std::make_unique<BitGetCmd>(kCmdNameBitGet, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsBit | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache);
std::make_unique<BitGetCmd>(kCmdNameBitGet, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsBit);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitGet, std::move(bitgetptr)));
////bitcountCmd
std::unique_ptr<Cmd> bitcountptr =
std::make_unique<BitCountCmd>(kCmdNameBitCount, -2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsBit | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache);
std::make_unique<BitCountCmd>(kCmdNameBitCount, -2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsBit);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitCount, std::move(bitcountptr)));
////bitposCmd
std::unique_ptr<Cmd> bitposptr =
std::make_unique<BitPosCmd>(kCmdNameBitPos, -3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsBit | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache);
std::make_unique<BitPosCmd>(kCmdNameBitPos, -3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsBit);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitPos, std::move(bitposptr)));
////bitopCmd
std::unique_ptr<Cmd> bitopptr =
std::make_unique<BitOpCmd>(kCmdNameBitOp, -3, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsBit | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache | kCmdFlagsReadCache);
std::make_unique<BitOpCmd>(kCmdNameBitOp, -3, kCmdFlagsWrite | kCmdFlagsMultiSlot | kCmdFlagsBit);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitOp, std::move(bitopptr)));

// HyperLogLog
Expand Down Expand Up @@ -851,6 +851,7 @@ void Cmd::DoCommand(const std::shared_ptr<Slot>& slot, const HintKeys& hint_keys
ReadCache(slot);
}
if (is_read() && res().CacheMiss()) {
pstd::lock::MultiScopeRecordLock record_lock(slot->LockMgr(), current_key());
DoThroughDB(slot);
if (IsNeedUpdateCache()) {
DoUpdateCache(slot);
Expand Down
Loading

0 comments on commit cb6256c

Please sign in to comment.