From b658ec970e0b4f6ec5ebacedbedca38b65d89f33 Mon Sep 17 00:00:00 2001 From: Mixficsol <838844609@qq.com> Date: Mon, 6 Nov 2023 03:42:14 -0600 Subject: [PATCH] add hash cache cmd (#251) add_hash_cache_cmd Co-authored-by: wuxianrong --- include/pika_hash.h | 52 ++++++ src/pika_command.cc | 28 ++-- src/pika_hash.cc | 399 ++++++++++++++++++++++++++++++++++++++------ 3 files changed, 410 insertions(+), 69 deletions(-) diff --git a/include/pika_hash.h b/include/pika_hash.h index e0872154fa..c9cf9bfaff 100644 --- a/include/pika_hash.h +++ b/include/pika_hash.h @@ -23,6 +23,8 @@ class HDelCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HDelCmd(*this); } @@ -30,7 +32,9 @@ class HDelCmd : public Cmd { private: std::string key_; std::vector fields_; + int32_t deleted_ = 0; void DoInitial() override; + rocksdb::Status s_; }; class HGetCmd : public Cmd { @@ -42,6 +46,9 @@ class HGetCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HGetCmd(*this); } @@ -49,6 +56,7 @@ class HGetCmd : public Cmd { private: std::string key_, field_; void DoInitial() override; + rocksdb::Status s_; }; class HGetallCmd : public Cmd { @@ -60,6 +68,9 @@ class HGetallCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HGetallCmd(*this); } @@ -67,6 +78,7 @@ class HGetallCmd : public Cmd { private: std::string key_; void DoInitial() override; + rocksdb::Status s_; }; class HSetCmd : public Cmd { @@ -78,6 +90,8 @@ class HSetCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HSetCmd(*this); } @@ -85,6 +99,7 @@ class HSetCmd : public Cmd { private: std::string key_, field_, value_; void DoInitial() override; + rocksdb::Status s_; }; class HExistsCmd : public Cmd { @@ -96,6 +111,9 @@ class HExistsCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HExistsCmd(*this); } @@ -103,6 +121,7 @@ class HExistsCmd : public Cmd { private: std::string key_, field_; void DoInitial() override; + rocksdb::Status s_; }; class HIncrbyCmd : public Cmd { @@ -114,6 +133,8 @@ class HIncrbyCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HIncrbyCmd(*this); } @@ -122,6 +143,7 @@ class HIncrbyCmd : public Cmd { std::string key_, field_; int64_t by_ = 0; void DoInitial() override; + rocksdb::Status s_; }; class HIncrbyfloatCmd : public Cmd { @@ -133,6 +155,8 @@ class HIncrbyfloatCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HIncrbyfloatCmd(*this); } @@ -140,6 +164,7 @@ class HIncrbyfloatCmd : public Cmd { private: std::string key_, field_, by_; void DoInitial() override; + rocksdb::Status s_; }; class HKeysCmd : public Cmd { @@ -151,6 +176,9 @@ class HKeysCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HKeysCmd(*this); } @@ -158,6 +186,7 @@ class HKeysCmd : public Cmd { private: std::string key_; void DoInitial() override; + rocksdb::Status s_; }; class HLenCmd : public Cmd { @@ -169,6 +198,9 @@ class HLenCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HLenCmd(*this); } @@ -176,6 +208,7 @@ class HLenCmd : public Cmd { private: std::string key_; void DoInitial() override; + rocksdb::Status s_; }; class HMgetCmd : public Cmd { @@ -187,6 +220,9 @@ class HMgetCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HMgetCmd(*this); } @@ -195,6 +231,7 @@ class HMgetCmd : public Cmd { std::string key_; std::vector fields_; void DoInitial() override; + rocksdb::Status s_; }; class HMsetCmd : public Cmd { @@ -206,6 +243,8 @@ class HMsetCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HMsetCmd(*this); } @@ -214,6 +253,7 @@ class HMsetCmd : public Cmd { std::string key_; std::vector fvs_; void DoInitial() override; + rocksdb::Status s_; }; class HSetnxCmd : public Cmd { @@ -225,6 +265,8 @@ class HSetnxCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HSetnxCmd(*this); } @@ -232,6 +274,7 @@ class HSetnxCmd : public Cmd { private: std::string key_, field_, value_; void DoInitial() override; + rocksdb::Status s_; }; class HStrlenCmd : public Cmd { @@ -243,6 +286,9 @@ class HStrlenCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HStrlenCmd(*this); } @@ -250,6 +296,7 @@ class HStrlenCmd : public Cmd { private: std::string key_, field_; void DoInitial() override; + rocksdb::Status s_; }; class HValsCmd : public Cmd { @@ -261,6 +308,9 @@ class HValsCmd : public Cmd { return res; } void Do(std::shared_ptr slot = nullptr) override; + void PreDo(std::shared_ptr slot = nullptr) override; + void DoFromCache(std::shared_ptr slot = nullptr) override; + void DoUpdateCache(std::shared_ptr slot = nullptr) override; void Split(std::shared_ptr slot, const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new HValsCmd(*this); } @@ -268,6 +318,7 @@ class HValsCmd : public Cmd { private: std::string key_, field_; void DoInitial() override; + rocksdb::Status s_; }; class HScanCmd : public Cmd { @@ -372,5 +423,6 @@ class PKHRScanRangeCmd : public Cmd { pattern_ = "*"; limit_ = 10; } + rocksdb::Status s_; }; #endif diff --git a/src/pika_command.cc b/src/pika_command.cc index 2005c5d4fd..3b394ffdf2 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -319,59 +319,59 @@ void InitCmdTable(CmdTable* cmd_table) { // Hash ////HDelCmd std::unique_ptr hdelptr = - std::make_unique(kCmdNameHDel, -3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHDel, -3, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache); cmd_table->insert(std::pair>(kCmdNameHDel, std::move(hdelptr))); ////HSetCmd std::unique_ptr hsetptr = - std::make_unique(kCmdNameHSet, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHSet, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache); cmd_table->insert(std::pair>(kCmdNameHSet, std::move(hsetptr))); ////HGetCmd std::unique_ptr hgetptr = - std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHGet, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHGet, std::move(hgetptr))); ////HGetallCmd std::unique_ptr hgetallptr = - std::make_unique(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHGetall, std::move(hgetallptr))); ////HExistsCmd std::unique_ptr hexistsptr = - std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHExists, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHExists, std::move(hexistsptr))); ////HIncrbyCmd std::unique_ptr hincrbyptr = - std::make_unique(kCmdNameHIncrby, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHIncrby, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache); cmd_table->insert(std::pair>(kCmdNameHIncrby, std::move(hincrbyptr))); ////HIncrbyfloatCmd std::unique_ptr hincrbyfloatptr = - std::make_unique(kCmdNameHIncrbyfloat, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHIncrbyfloat, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache); cmd_table->insert(std::pair>(kCmdNameHIncrbyfloat, std::move(hincrbyfloatptr))); ////HKeysCmd std::unique_ptr hkeysptr = - std::make_unique(kCmdNameHKeys, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHKeys, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHKeys, std::move(hkeysptr))); ////HLenCmd std::unique_ptr hlenptr = - std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHLen, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHLen, std::move(hlenptr))); ////HMgetCmd std::unique_ptr hmgetptr = - std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHMget, -3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHMget, std::move(hmgetptr))); ////HMsetCmd std::unique_ptr hmsetptr = - std::make_unique(kCmdNameHMset, -4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHMset, -4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache); cmd_table->insert(std::pair>(kCmdNameHMset, std::move(hmsetptr))); ////HSetnxCmd std::unique_ptr hsetnxptr = - std::make_unique(kCmdNameHSetnx, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHSetnx, 4, kCmdFlagsWrite | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache); cmd_table->insert(std::pair>(kCmdNameHSetnx, std::move(hsetnxptr))); ////HStrlenCmd std::unique_ptr hstrlenptr = - std::make_unique(kCmdNameHStrlen, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHStrlen, 3, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHStrlen, std::move(hstrlenptr))); ////HValsCmd std::unique_ptr hvalsptr = - std::make_unique(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash); + std::make_unique(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsSingleSlot | kCmdFlagsHash | kCmdFlagsUpdateCache | kCmdFlagsDoFromCache | kCmdFlagsPreDo); cmd_table->insert(std::pair>(kCmdNameHVals, std::move(hvalsptr))); ////HScanCmd std::unique_ptr hscanptr = diff --git a/src/pika_hash.cc b/src/pika_hash.cc index e7ec39af66..ea2ef5fbd6 100644 --- a/src/pika_hash.cc +++ b/src/pika_hash.cc @@ -9,6 +9,7 @@ #include "include/pika_conf.h" #include "include/pika_slot_command.h" +#include "include/pika_cache.h" extern std::unique_ptr g_pika_conf; @@ -26,11 +27,22 @@ void HDelCmd::DoInitial() { void HDelCmd::Do(std::shared_ptr slot) { int32_t num = 0; - rocksdb::Status s = slot->db()->HDel(key_, fields_, &num); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->HDel(key_, fields_, &num); + if (s_.ok() || s_.IsNotFound()) { res_.AppendInteger(num); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HDelCmd::DoFromCache(std::shared_ptr slot) { + Do(slot); +} + +void HDelCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok() && deleted_ > 0) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->HDel(CachePrefixKeyh, fields_); } } @@ -46,12 +58,23 @@ void HSetCmd::DoInitial() { void HSetCmd::Do(std::shared_ptr slot) { int32_t ret = 0; - rocksdb::Status s = slot->db()->HSet(key_, field_, value_, &ret); - if (s.ok()) { + s_ = slot->db()->HSet(key_, field_, value_, &ret); + if (s_.ok()) { res_.AppendContent(":" + std::to_string(ret)); AddSlotKey("h", key_, slot); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HSetCmd::DoFromCache(std::shared_ptr slot) { + Do(slot); +} + +void HSetCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->HSetIfKeyExist(CachePrefixKeyh, field_, value_); } } @@ -66,17 +89,43 @@ void HGetCmd::DoInitial() { void HGetCmd::Do(std::shared_ptr slot) { std::string value; - rocksdb::Status s = slot->db()->HGet(key_, field_, &value); - if (s.ok()) { + s_ = slot->db()->HGet(key_, field_, &value); + if (s_.ok()) { res_.AppendStringLenUint64(value.size()); res_.AppendContent(value); - } else if (s.IsNotFound()) { + } else if (s_.IsNotFound()) { res_.AppendContent("$-1"); + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HGetCmd::PreDo(std::shared_ptr slot) { + std::string value; + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HGet(CachePrefixKeyh, field_, &value); + if (s.ok()) { + res_.AppendStringLen(value.size()); + res_.AppendContent(value); + } else if (s.IsNotFound()) { + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } } +void HGetCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HGetCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } +} + void HGetallCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameHGetall); @@ -91,13 +140,12 @@ void HGetallCmd::Do(std::shared_ptr slot) { int64_t next_cursor = 0; size_t raw_limit = g_pika_conf->max_client_response_size(); std::string raw; - rocksdb::Status s; std::vector fvs; do { fvs.clear(); - s = slot->db()->HScan(key_, cursor, "*", PIKA_SCAN_STEP_LENGTH, &fvs, &next_cursor); - if (!s.ok()) { + s_ = slot->db()->HScan(key_, cursor, "*", PIKA_SCAN_STEP_LENGTH, &fvs, &next_cursor); + if (!s_.ok()) { raw.clear(); total_fv = 0; break; @@ -117,14 +165,45 @@ void HGetallCmd::Do(std::shared_ptr slot) { } } while (cursor != 0); - if (s.ok() || s.IsNotFound()) { + if (s_.ok() || s_.IsNotFound()) { res_.AppendArrayLen(total_fv * 2); res_.AppendStringRaw(raw); + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HGetallCmd::PreDo(std::shared_ptr slot) { + std::vector fvs; + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HGetall(CachePrefixKeyh, &fvs); + if (s.ok()) { + res_.AppendArrayLen(fvs.size() * 2); + for (const auto& fv : fvs) { + res_.AppendStringLen(fv.field.size()); + res_.AppendContent(fv.field); + res_.AppendStringLen(fv.value.size()); + res_.AppendContent(fv.value); + } + } else if (s.IsNotFound()) { + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } } +void HGetallCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HGetallCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } +} + void HExistsCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameHExists); @@ -135,16 +214,40 @@ void HExistsCmd::DoInitial() { } void HExistsCmd::Do(std::shared_ptr slot) { - rocksdb::Status s = slot->db()->HExists(key_, field_); + s_ = slot->db()->HExists(key_, field_); + if (s_.ok()) { + res_.AppendContent(":1"); + } else if (s_.IsNotFound()) { + res_.AppendContent(":0"); + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HExistsCmd::PreDo(std::shared_ptr slot) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HExists(CachePrefixKeyh, field_); if (s.ok()) { res_.AppendContent(":1"); } else if (s.IsNotFound()) { - res_.AppendContent(":0"); + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } } +void HExistsCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HExistsCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } +} + void HIncrbyCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameHIncrby); @@ -160,16 +263,27 @@ void HIncrbyCmd::DoInitial() { void HIncrbyCmd::Do(std::shared_ptr slot) { int64_t new_value; - rocksdb::Status s = slot->db()->HIncrby(key_, field_, by_, &new_value); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->HIncrby(key_, field_, by_, &new_value); + if (s_.ok() || s_.IsNotFound()) { res_.AppendContent(":" + std::to_string(new_value)); AddSlotKey("h", key_, slot); - } else if (s.IsCorruption() && s.ToString() == "Corruption: hash value is not an integer") { + } else if (s_.IsCorruption() && s_.ToString() == "Corruption: hash value is not an integer") { res_.SetRes(CmdRes::kInvalidInt); - } else if (s.IsInvalidArgument()) { + } else if (s_.IsInvalidArgument()) { res_.SetRes(CmdRes::kOverFlow); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HIncrbyCmd::DoFromCache(std::shared_ptr slot) { + Do(slot); +} + +void HIncrbyCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->HIncrbyxx(CachePrefixKeyh, field_, by_); } } @@ -185,17 +299,31 @@ void HIncrbyfloatCmd::DoInitial() { void HIncrbyfloatCmd::Do(std::shared_ptr slot) { std::string new_value; - rocksdb::Status s = slot->db()->HIncrbyfloat(key_, field_, by_, &new_value); - if (s.ok()) { + s_ = slot->db()->HIncrbyfloat(key_, field_, by_, &new_value); + if (s_.ok()) { res_.AppendStringLenUint64(new_value.size()); res_.AppendContent(new_value); AddSlotKey("h", key_, slot); - } else if (s.IsCorruption() && s.ToString() == "Corruption: value is not a vaild float") { + } else if (s_.IsCorruption() && s_.ToString() == "Corruption: value is not a vaild float") { res_.SetRes(CmdRes::kInvalidFloat); - } else if (s.IsInvalidArgument()) { + } else if (s_.IsInvalidArgument()) { res_.SetRes(CmdRes::kOverFlow); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HIncrbyfloatCmd::DoFromCache(std::shared_ptr slot) { + Do(slot); +} + +void HIncrbyfloatCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + long double long_double_by; + if (storage::StrToLongDouble(by_.data(), by_.size(), &long_double_by) != -1) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->HIncrbyfloatxx(CachePrefixKeyh, field_, long_double_by); + } } } @@ -209,17 +337,45 @@ void HKeysCmd::DoInitial() { void HKeysCmd::Do(std::shared_ptr slot) { std::vector fields; - rocksdb::Status s = slot->db()->HKeys(key_, &fields); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->HKeys(key_, &fields); + if (s_.ok() || s_.IsNotFound()) { res_.AppendArrayLenUint64(fields.size()); for (const auto& field : fields) { res_.AppendString(field); } + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HKeysCmd::PreDo(std::shared_ptr slot) { + std::vector fields; + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HKeys(CachePrefixKeyh, &fields); + if (s.ok()) { + res_.AppendArrayLen(fields.size()); + for (const auto& field : fields) { + res_.AppendString(field); + } + } else if (s.IsNotFound()) { + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } } +void HKeysCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HKeysCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } +} + void HLenCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameHLen); @@ -230,14 +386,39 @@ void HLenCmd::DoInitial() { void HLenCmd::Do(std::shared_ptr slot) { int32_t len = 0; - rocksdb::Status s = slot->db()->HLen(key_, &len); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->HLen(key_, &len); + if (s_.ok() || s_.IsNotFound()) { + res_.AppendInteger(len); + } else { + res_.SetRes(CmdRes::kErrOther, "something wrong in hlen"); + } +} + +void HLenCmd::PreDo(std::shared_ptr slot) { + uint64_t len = 0; + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HLen(CachePrefixKeyh, &len); + if (s.ok()) { res_.AppendInteger(len); + } else if (s.IsNotFound()) { + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, "something wrong in hlen"); } } +void HLenCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HLenCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } +} + void HMgetCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameHMget); @@ -252,8 +433,8 @@ void HMgetCmd::DoInitial() { void HMgetCmd::Do(std::shared_ptr slot) { std::vector vss; - rocksdb::Status s = slot->db()->HMGet(key_, fields_, &vss); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->HMGet(key_, fields_, &vss); + if (s_.ok() || s_.IsNotFound()) { res_.AppendArrayLenUint64(vss.size()); for (const auto& vs : vss) { if (vs.status.ok()) { @@ -263,11 +444,44 @@ void HMgetCmd::Do(std::shared_ptr slot) { res_.AppendContent("$-1"); } } + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HMgetCmd::PreDo(std::shared_ptr slot) { + std::vector vss; + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HMGet(CachePrefixKeyh, fields_, &vss); + if (s.ok()) { + res_.AppendArrayLen(vss.size()); + for (const auto& vs : vss) { + if (vs.status.ok()) { + res_.AppendStringLen(vs.value.size()); + res_.AppendContent(vs.value); + } else { + res_.AppendContent("$-1"); + } + } + } else if (s.IsNotFound()) { + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } } +void HMgetCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HMgetCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } +} + void HMsetCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameHMset); @@ -287,12 +501,23 @@ void HMsetCmd::DoInitial() { } void HMsetCmd::Do(std::shared_ptr slot) { - rocksdb::Status s = slot->db()->HMSet(key_, fvs_); - if (s.ok()) { + s_ = slot->db()->HMSet(key_, fvs_); + if (s_.ok()) { res_.SetRes(CmdRes::kOk); AddSlotKey("h", key_, slot); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HMsetCmd::DoFromCache(std::shared_ptr slot) { + Do(slot); +} + +void HMsetCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->HMSetxx(CachePrefixKeyh, fvs_); } } @@ -308,12 +533,23 @@ void HSetnxCmd::DoInitial() { void HSetnxCmd::Do(std::shared_ptr slot) { int32_t ret = 0; - rocksdb::Status s = slot->db()->HSetnx(key_, field_, value_, &ret); - if (s.ok()) { + s_ = slot->db()->HSetnx(key_, field_, value_, &ret); + if (s_.ok()) { res_.AppendContent(":" + std::to_string(ret)); AddSlotKey("h", key_, slot); } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HSetnxCmd::DoFromCache(std::shared_ptr slot) { + Do(slot); +} + +void HSetnxCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->HSetIfKeyExistAndFieldNotExist(CachePrefixKeyh, field_, value_); } } @@ -328,12 +564,38 @@ void HStrlenCmd::DoInitial() { void HStrlenCmd::Do(std::shared_ptr slot) { int32_t len = 0; - rocksdb::Status s = slot->db()->HStrlen(key_, field_, &len); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->HStrlen(key_, field_, &len); + if (s_.ok() || s_.IsNotFound()) { + res_.AppendInteger(len); + } else { + res_.SetRes(CmdRes::kErrOther, "something wrong in hstrlen"); + } +} + +void HStrlenCmd::PreDo(std::shared_ptr slot) { + uint64_t len = 0; + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HStrlen(CachePrefixKeyh, field_, &len); + if (s.ok()) { res_.AppendInteger(len); + } else if (s.IsNotFound()) { + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, "something wrong in hstrlen"); } + return; +} + +void HStrlenCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HStrlenCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } } void HValsCmd::DoInitial() { @@ -346,18 +608,47 @@ void HValsCmd::DoInitial() { void HValsCmd::Do(std::shared_ptr slot) { std::vector values; - rocksdb::Status s = slot->db()->HVals(key_, &values); - if (s.ok() || s.IsNotFound()) { + s_ = slot->db()->HVals(key_, &values); + if (s_.ok() || s_.IsNotFound()) { res_.AppendArrayLenUint64(values.size()); for (const auto& value : values) { res_.AppendStringLenUint64(value.size()); res_.AppendContent(value); } + } else { + res_.SetRes(CmdRes::kErrOther, s_.ToString()); + } +} + +void HValsCmd::PreDo(std::shared_ptr slot) { + std::vector values; + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + auto s = slot->cache()->HVals(CachePrefixKeyh, &values); + if (s.ok()) { + res_.AppendArrayLen(values.size()); + for (const auto& value : values) { + res_.AppendStringLen(value.size()); + res_.AppendContent(value); + } + } else if (s.IsNotFound()) { + res_.SetRes(CmdRes::kCacheMiss); } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } } +void HValsCmd::DoFromCache(std::shared_ptr slot) { + res_.clear(); + Do(slot); +} + +void HValsCmd::DoUpdateCache(std::shared_ptr slot) { + if (s_.ok()) { + std::string CachePrefixKeyh = PCacheKeyPrefixH + key_; + slot->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_HASH, CachePrefixKeyh); + } +} + void HScanCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameHScan); @@ -400,9 +691,9 @@ void HScanCmd::DoInitial() { void HScanCmd::Do(std::shared_ptr slot) { int64_t next_cursor = 0; std::vector field_values; - rocksdb::Status s = slot->db()->HScan(key_, cursor_, pattern_, count_, &field_values, &next_cursor); + s_ = slot->db()->HScan(key_, cursor_, pattern_, count_, &field_values, &next_cursor); - if (s.ok() || s.IsNotFound()) { + if (s_.ok() || s_.IsNotFound()) { res_.AppendContent("*2"); char buf[32]; int32_t len = pstd::ll2string(buf, sizeof(buf), next_cursor); @@ -415,7 +706,7 @@ void HScanCmd::Do(std::shared_ptr slot) { res_.AppendString(field_value.value); } } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -458,9 +749,9 @@ void HScanxCmd::DoInitial() { void HScanxCmd::Do(std::shared_ptr slot) { std::string next_field; std::vector field_values; - rocksdb::Status s = slot->db()->HScanx(key_, start_field_, pattern_, count_, &field_values, &next_field); + s_ = slot->db()->HScanx(key_, start_field_, pattern_, count_, &field_values, &next_field); - if (s.ok() || s.IsNotFound()) { + if (s_.ok() || s_.IsNotFound()) { res_.AppendArrayLen(2); res_.AppendStringLenUint64(next_field.size()); res_.AppendContent(next_field); @@ -471,7 +762,7 @@ void HScanxCmd::Do(std::shared_ptr slot) { res_.AppendString(field_value.value); } } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -511,10 +802,9 @@ void PKHScanRangeCmd::DoInitial() { void PKHScanRangeCmd::Do(std::shared_ptr slot) { std::string next_field; std::vector field_values; - rocksdb::Status s = - slot->db()->PKHScanRange(key_, field_start_, field_end_, pattern_, static_cast(limit_), &field_values, &next_field); + s_ = slot->db()->PKHScanRange(key_, field_start_, field_end_, pattern_, static_cast(limit_), &field_values, &next_field); - if (s.ok() || s.IsNotFound()) { + if (s_.ok() || s_.IsNotFound()) { res_.AppendArrayLen(2); res_.AppendString(next_field); @@ -524,7 +814,7 @@ void PKHScanRangeCmd::Do(std::shared_ptr slot) { res_.AppendString(field_value.value); } } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } } @@ -564,10 +854,9 @@ void PKHRScanRangeCmd::DoInitial() { void PKHRScanRangeCmd::Do(std::shared_ptr slot) { std::string next_field; std::vector field_values; - rocksdb::Status s = - slot->db()->PKHRScanRange(key_, field_start_, field_end_, pattern_, static_cast(limit_), &field_values, &next_field); + s_ = slot->db()->PKHRScanRange(key_, field_start_, field_end_, pattern_, static_cast(limit_), &field_values, &next_field); - if (s.ok() || s.IsNotFound()) { + if (s_.ok() || s_.IsNotFound()) { res_.AppendArrayLen(2); res_.AppendString(next_field); @@ -577,6 +866,6 @@ void PKHRScanRangeCmd::Do(std::shared_ptr slot) { res_.AppendString(field_value.value); } } else { - res_.SetRes(CmdRes::kErrOther, s.ToString()); + res_.SetRes(CmdRes::kErrOther, s_.ToString()); } }