From 322619ffef6e90799d1771698b9b011a880512a9 Mon Sep 17 00:00:00 2001 From: chejinge <945997690@qq.com> Date: Wed, 7 Aug 2024 15:52:10 +0800 Subject: [PATCH] fix: PkPatternMatchDel inconsistent between rediscache and db (#2839) * fix: PkPatternMatchDel inconsistent between rediscache and db --------- Co-authored-by: haiyang426 <51285701+haiyang426@users.noreply.github.com> Co-authored-by: chejinge --- CMakeLists.txt | 1 - conf/pika.conf | 5 +-- include/pika_admin.h | 4 ++ src/pika_admin.cc | 59 +++++++++++++++++++++++++-- src/storage/include/storage/storage.h | 2 +- src/storage/src/redis.h | 3 +- src/storage/src/redis_hashes.cc | 24 ++++------- src/storage/src/redis_hashes.h | 2 +- src/storage/src/redis_lists.cc | 24 ++++------- src/storage/src/redis_lists.h | 2 +- src/storage/src/redis_sets.cc | 24 ++++------- src/storage/src/redis_sets.h | 2 +- src/storage/src/redis_streams.cc | 24 ++++------- src/storage/src/redis_streams.h | 2 +- src/storage/src/redis_strings.cc | 28 ++++--------- src/storage/src/redis_strings.h | 6 +-- src/storage/src/redis_zsets.cc | 24 ++++------- src/storage/src/redis_zsets.h | 3 +- src/storage/src/storage.cc | 18 ++++---- src/storage/tests/keys_test.cc | 2 - 20 files changed, 134 insertions(+), 125 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index ad9b7628c8..22a2bcf383 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -658,7 +658,6 @@ ExternalProject_Add(rediscache set(REDISCACHE_INCLUDE_DIR ${INSTALL_INCLUDEDIR}) set(REDISCACHE_LIBRARY ${INSTALL_LIBDIR}/librediscache.a) - option(USE_PIKA_TOOLS "compile pika-tools" OFF) if (USE_PIKA_TOOLS) ExternalProject_Add(hiredis diff --git a/conf/pika.conf b/conf/pika.conf index 51859ec6e7..e3d49c8b4e 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -255,12 +255,9 @@ slave-priority : 100 # The disable_auto_compactions option is [true | false] disable_auto_compactions : false -<<<<<<< HEAD -======= # Rocksdb max_subcompactions, increasing this value can accelerate the exec speed of a single compaction task # it's recommended to increase it's value if large compaction is found in you instance max-subcompactions : 1 ->>>>>>> f95f867c (fix: Revised RocksDB-Related Parameters in Pika (#2728)) # The minimum disk usage ratio for checking resume. # If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume. # Its default value is 0.7. @@ -501,7 +498,7 @@ default-slot-num : 1024 # [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit. # [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes). throttle-bytes-per-second : 207200000 -<<<<<<< HEAD + # Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small. # [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command # [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust. diff --git a/include/pika_admin.h b/include/pika_admin.h index ded28649d2..0c3a37d6f6 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -479,13 +479,17 @@ class PKPatternMatchDelCmd : public Cmd { PKPatternMatchDelCmd(const std::string& name, int arity, uint32_t flag) : Cmd(name, arity, flag, static_cast(AclCategory::ADMIN)) {} void Do() override; + void DoThroughDB() override; + void DoUpdateCache() override; void Split(const HintKeys& hint_keys) override {}; void Merge() override {}; Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); } private: storage::DataType type_ = storage::kAll; + std::vector remove_keys_; std::string pattern_; + int64_t max_count_; void DoInitial() override; }; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index ca55b5b76f..deba49f5c5 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -3098,15 +3098,66 @@ void PKPatternMatchDelCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidDbType, kCmdNamePKPatternMatchDel); return; } + max_count_ = storage::BATCH_DELETE_LIMIT; + if (argv_.size() > 3) { + if (pstd::string2int(argv_[3].data(), argv_[3].size(), &max_count_) == 0 || max_count_ < 1 || max_count_ > storage::BATCH_DELETE_LIMIT) { + res_.SetRes(CmdRes::kInvalidInt); + return; + } + } } void PKPatternMatchDelCmd::Do() { - int ret = 0; - rocksdb::Status s = db_->storage()->PKPatternMatchDel(type_, pattern_, &ret); - if (s.ok()) { - res_.AppendInteger(ret); + int64_t count = 0; + rocksdb::Status s = db_->storage()->PKPatternMatchDelWithRemoveKeys(type_, pattern_, &count, &remove_keys_, max_count_); + + if(s.ok()) { + res_.AppendInteger(count); + s_ = rocksdb::Status::OK(); + for (const auto& key : remove_keys_) { + RemSlotKey(key, db_); + } } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); + if (count >= 0) { + s_ = rocksdb::Status::OK(); + for (const auto& key : remove_keys_) { + RemSlotKey(key, db_); + } + } + } +} + +void PKPatternMatchDelCmd::DoThroughDB() { + Do(); +} + +void PKPatternMatchDelCmd::DoUpdateCache() { + if (s_.ok()) { + std::vector v; + for (auto key : remove_keys_) { + // only delete the corresponding prefix + switch (type_) { + case storage::kSets: + v.emplace_back(PCacheKeyPrefixS + key); + break; + case storage::kLists: + v.emplace_back(PCacheKeyPrefixL + key); + break; + case storage::kStrings: + v.emplace_back(PCacheKeyPrefixK + key); + break; + case storage::kZSets: + v.emplace_back(PCacheKeyPrefixZ + key); + break; + case storage::kHashes: + v.emplace_back(PCacheKeyPrefixH + key); + break; + default: + break; + } + } + db_->cache()->Del(v); } } diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index 84b2132255..540bac5880 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -988,7 +988,7 @@ class Storage { // Traverses the database of the specified type, removing the Key that matches // the pattern - Status PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret); + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count); // Iterate over a collection of elements // return next_key that the user need to use as the start_key argument diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 24880ac4a3..21eaa2aa94 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -98,7 +98,6 @@ class Redis { virtual Status GetProperty(const std::string& property, uint64_t* out) = 0; virtual Status ScanKeyNum(KeyInfo* key_info) = 0; virtual Status ScanKeys(const std::string& pattern, std::vector* keys) = 0; - virtual Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) = 0; // Keys Commands virtual Status Expire(const Slice& key, int32_t ttl) = 0; @@ -110,7 +109,7 @@ class Redis { virtual Status Expireat(const Slice& key, int32_t timestamp) = 0; virtual Status Persist(const Slice& key) = 0; virtual Status TTL(const Slice& key, int64_t* timestamp) = 0; - + virtual Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) = 0; Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys); Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold); Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold); diff --git a/src/storage/src/redis_hashes.cc b/src/storage/src/redis_hashes.cc index ec493014fb..b885a487dd 100644 --- a/src/storage/src/redis_hashes.cc +++ b/src/storage/src/redis_hashes.cc @@ -145,7 +145,7 @@ Status RedisHashes::ScanKeys(const std::string& pattern, std::vector* remove_keys, const int64_t& max_count) { rocksdb::ReadOptions iterator_options; const rocksdb::Snapshot* snapshot; ScopeSnapshot ss(db_, &snapshot); @@ -154,7 +154,7 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret) std::string key; std::string meta_value; - int32_t total_delete = 0; + int64_t total_delete = 0; Status s; rocksdb::WriteBatch batch; rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]); @@ -162,7 +162,7 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret) delete iter; }; iter->SeekToFirst(); - while (iter->Valid()) { + while (iter->Valid() && static_cast(batch.Count()) < max_count) { key = iter->key().ToString(); meta_value = iter->value().ToString(); ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value); @@ -170,24 +170,18 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret) (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) { parsed_hashes_meta_value.InitialMetaValue(); batch.Put(handles_[0], key, meta_value); - } - if (static_cast(batch.Count()) >= BATCH_DELETE_LIMIT) { - s = db_->Write(default_write_options_, &batch); - if (s.ok()) { - total_delete += static_cast( batch.Count()); - batch.Clear(); - } else { - *ret = total_delete; - return s; - } + remove_keys->push_back(key); } iter->Next(); } - if (batch.Count() != 0U) { + auto batchNum = batch.Count(); + if (batchNum != 0U) { s = db_->Write(default_write_options_, &batch); if (s.ok()) { - total_delete += static_cast(batch.Count()); + total_delete += static_cast(batchNum); batch.Clear(); + } else { + remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end()); } } diff --git a/src/storage/src/redis_hashes.h b/src/storage/src/redis_hashes.h index 6733748123..cc6c7c6529 100644 --- a/src/storage/src/redis_hashes.h +++ b/src/storage/src/redis_hashes.h @@ -26,7 +26,7 @@ class RedisHashes : public Redis { Status GetProperty(const std::string& property, uint64_t* out) override; Status ScanKeyNum(KeyInfo* key_info) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; - Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) override; // Hashes Commands Status HDel(const Slice& key, const std::vector& fields, int32_t* ret); diff --git a/src/storage/src/redis_lists.cc b/src/storage/src/redis_lists.cc index 4a364e60c1..09a045a4d4 100644 --- a/src/storage/src/redis_lists.cc +++ b/src/storage/src/redis_lists.cc @@ -152,7 +152,7 @@ Status RedisLists::ScanKeys(const std::string& pattern, std::vector return Status::OK(); } -Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { +Status RedisLists::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) { rocksdb::ReadOptions iterator_options; const rocksdb::Snapshot* snapshot; ScopeSnapshot ss(db_, &snapshot); @@ -161,7 +161,7 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { std::string key; std::string meta_value; - int32_t total_delete = 0; + int64_t total_delete = 0; Status s; rocksdb::WriteBatch batch; rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]); @@ -169,7 +169,7 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { delete iter; }; iter->SeekToFirst(); - while (iter->Valid()) { + while (iter->Valid() && static_cast(batch.Count()) < max_count) { key = iter->key().ToString(); meta_value = iter->value().ToString(); ParsedListsMetaValue parsed_lists_meta_value(&meta_value); @@ -177,24 +177,18 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) { parsed_lists_meta_value.InitialMetaValue(); batch.Put(handles_[0], key, meta_value); - } - if (static_cast(batch.Count()) >= BATCH_DELETE_LIMIT) { - s = db_->Write(default_write_options_, &batch); - if (s.ok()) { - total_delete += static_cast(batch.Count()); - batch.Clear(); - } else { - *ret = total_delete; - return s; - } + remove_keys->push_back(key); } iter->Next(); } - if (batch.Count() != 0U) { + auto batchNum = batch.Count(); + if (batchNum != 0U) { s = db_->Write(default_write_options_, &batch); if (s.ok()) { - total_delete += static_cast(batch.Count()); + total_delete += static_cast(batchNum); batch.Clear(); + } else { + remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end()); } } diff --git a/src/storage/src/redis_lists.h b/src/storage/src/redis_lists.h index 9f23eee375..d56c5e47e7 100644 --- a/src/storage/src/redis_lists.h +++ b/src/storage/src/redis_lists.h @@ -27,7 +27,7 @@ class RedisLists : public Redis { Status GetProperty(const std::string& property, uint64_t* out) override; Status ScanKeyNum(KeyInfo* key_info) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; - Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) override; // Lists commands; Status LIndex(const Slice& key, int64_t index, std::string* element); diff --git a/src/storage/src/redis_sets.cc b/src/storage/src/redis_sets.cc index 48ae586e96..5707e032d4 100644 --- a/src/storage/src/redis_sets.cc +++ b/src/storage/src/redis_sets.cc @@ -152,7 +152,7 @@ rocksdb::Status RedisSets::ScanKeys(const std::string& pattern, std::vector* remove_keys, const int64_t& max_count) { rocksdb::ReadOptions iterator_options; const rocksdb::Snapshot* snapshot; ScopeSnapshot ss(db_, &snapshot); @@ -161,7 +161,7 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t std::string key; std::string meta_value; - int32_t total_delete = 0; + int64_t total_delete = 0; rocksdb::Status s; rocksdb::WriteBatch batch; rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]); @@ -169,7 +169,7 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t delete iter; }; iter->SeekToFirst(); - while (iter->Valid()) { + while (iter->Valid() && static_cast(batch.Count()) < max_count) { key = iter->key().ToString(); meta_value = iter->value().ToString(); ParsedSetsMetaValue parsed_sets_meta_value(&meta_value); @@ -177,24 +177,18 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) { parsed_sets_meta_value.InitialMetaValue(); batch.Put(handles_[0], key, meta_value); - } - if (static_cast(batch.Count()) >= BATCH_DELETE_LIMIT) { - s = db_->Write(default_write_options_, &batch); - if (s.ok()) { - total_delete += static_cast(batch.Count()); - batch.Clear(); - } else { - *ret = total_delete; - return s; - } + remove_keys->push_back(key); } iter->Next(); } - if (batch.Count() != 0U) { + auto batchNum = batch.Count(); + if (batchNum != 0U) { s = db_->Write(default_write_options_, &batch); if (s.ok()) { - total_delete += static_cast(batch.Count()); + total_delete += static_cast(batchNum); batch.Clear(); + } else { + remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end()); } } diff --git a/src/storage/src/redis_sets.h b/src/storage/src/redis_sets.h index 2898d0e9e7..139412da59 100644 --- a/src/storage/src/redis_sets.h +++ b/src/storage/src/redis_sets.h @@ -28,7 +28,7 @@ class RedisSets : public Redis { Status GetProperty(const std::string& property, uint64_t* out) override; Status ScanKeyNum(KeyInfo* key_info) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; - Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) override; // Setes Commands Status SAdd(const Slice& key, const std::vector& members, int32_t* ret); diff --git a/src/storage/src/redis_streams.cc b/src/storage/src/redis_streams.cc index 04abf437b8..48578ae5b5 100644 --- a/src/storage/src/redis_streams.cc +++ b/src/storage/src/redis_streams.cc @@ -443,7 +443,7 @@ Status RedisStreams::ScanKeys(const std::string& pattern, std::vector* remove_keys, const int64_t& max_count) { rocksdb::ReadOptions iterator_options; const rocksdb::Snapshot* snapshot; ScopeSnapshot ss(db_, &snapshot); @@ -452,7 +452,7 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret) std::string key; std::string meta_value; - int32_t total_delete = 0; + int64_t total_delete = 0; Status s; rocksdb::WriteBatch batch; rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]); @@ -460,7 +460,7 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret) delete iter; }; iter->SeekToFirst(); - while (iter->Valid()) { + while (iter->Valid() && static_cast(batch.Count()) < max_count) { key = iter->key().ToString(); meta_value = iter->value().ToString(); StreamMetaValue stream_meta_value; @@ -469,24 +469,18 @@ Status RedisStreams::PKPatternMatchDel(const std::string& pattern, int32_t* ret) (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) { stream_meta_value.InitMetaValue(); batch.Put(handles_[0], key, stream_meta_value.value()); - } - if (static_cast(batch.Count()) >= BATCH_DELETE_LIMIT) { - s = db_->Write(default_write_options_, &batch); - if (s.ok()) { - total_delete += static_cast(batch.Count()); - batch.Clear(); - } else { - *ret = total_delete; - return s; - } + remove_keys->push_back(key); } iter->Next(); } - if (batch.Count() != 0U) { + auto batchNum = batch.Count(); + if (batchNum != 0U) { s = db_->Write(default_write_options_, &batch); if (s.ok()) { - total_delete += static_cast(batch.Count()); + total_delete += static_cast(batchNum); batch.Clear(); + } else { + remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end()); } } diff --git a/src/storage/src/redis_streams.h b/src/storage/src/redis_streams.h index c964efef7f..e622a3db11 100644 --- a/src/storage/src/redis_streams.h +++ b/src/storage/src/redis_streams.h @@ -148,7 +148,7 @@ class RedisStreams : public Redis { Status GetProperty(const std::string& property, uint64_t* out) override; Status ScanKeyNum(KeyInfo* keyinfo) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; - Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) override; Status PKScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit, std::vector* keys, std::string* next_key); Status PKRScanRange(const Slice& key_start, const Slice& key_end, const Slice& pattern, int32_t limit, diff --git a/src/storage/src/redis_strings.cc b/src/storage/src/redis_strings.cc index ffa2cfcd18..6997531d83 100644 --- a/src/storage/src/redis_strings.cc +++ b/src/storage/src/redis_strings.cc @@ -115,7 +115,7 @@ Status RedisStrings::ScanKeys(const std::string& pattern, std::vector* remove_keys, const int64_t& max_count) { rocksdb::ReadOptions iterator_options; const rocksdb::Snapshot* snapshot; ScopeSnapshot ss(db_, &snapshot); @@ -124,7 +124,7 @@ Status RedisStrings::PKPatternMatchDel(const std::string& pattern, int32_t* ret) std::string key; std::string value; - int32_t total_delete = 0; + int64_t total_delete = 0; Status s; rocksdb::WriteBatch batch; rocksdb::Iterator* iter = db_->NewIterator(iterator_options); @@ -132,35 +132,26 @@ Status RedisStrings::PKPatternMatchDel(const std::string& pattern, int32_t* ret) delete iter; }; iter->SeekToFirst(); - while (iter->Valid()) { + while (iter->Valid() && static_cast(batch.Count()) < max_count) { key = iter->key().ToString(); value = iter->value().ToString(); ParsedStringsValue parsed_strings_value(&value); if (!parsed_strings_value.IsStale() && (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) { batch.Delete(key); - } - // In order to be more efficient, we use batch deletion here - if (static_cast(batch.Count()) >= BATCH_DELETE_LIMIT) { - s = db_->Write(default_write_options_, &batch); - if (s.ok()) { - total_delete += static_cast(batch.Count()); - batch.Clear(); - } else { - *ret = total_delete; - delete iter; - return s; - } + remove_keys->push_back(key); } iter->Next(); } - if (batch.Count() != 0U) { + auto batchNum = batch.Count(); + if (batchNum != 0U) { s = db_->Write(default_write_options_, &batch); if (s.ok()) { - total_delete += static_cast( batch.Count()); + total_delete += static_cast(batchNum); batch.Clear(); + } else { + remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end()); } } - delete iter; *ret = total_delete; return s; } @@ -1477,5 +1468,4 @@ void RedisStrings::ScanDatabase() { } delete iter; } - } // namespace storage diff --git a/src/storage/src/redis_strings.h b/src/storage/src/redis_strings.h index 2cb0bdb13f..ef11531d76 100644 --- a/src/storage/src/redis_strings.h +++ b/src/storage/src/redis_strings.h @@ -9,7 +9,6 @@ #include #include #include - #include "src/redis.h" namespace storage { @@ -26,9 +25,8 @@ class RedisStrings : public Redis { Status GetProperty(const std::string& property, uint64_t* out) override; Status ScanKeyNum(KeyInfo* key_info) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; - Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; - - // Strings Commands + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) override; + // Strings Command Status Append(const Slice& key, const Slice& value, int32_t* ret); Status BitCount(const Slice& key, int64_t start_offset, int64_t end_offset, int32_t* ret, bool have_range); Status BitOp(BitOpType op, const std::string& dest_key, const std::vector& src_keys, std::string &value_to_dest, int64_t* ret); diff --git a/src/storage/src/redis_zsets.cc b/src/storage/src/redis_zsets.cc index 72fa33dce1..4f1436b542 100644 --- a/src/storage/src/redis_zsets.cc +++ b/src/storage/src/redis_zsets.cc @@ -167,7 +167,7 @@ Status RedisZSets::ScanKeys(const std::string& pattern, std::vector return Status::OK(); } -Status RedisZSets::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { +Status RedisZSets::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) { rocksdb::ReadOptions iterator_options; const rocksdb::Snapshot* snapshot; ScopeSnapshot ss(db_, &snapshot); @@ -176,7 +176,7 @@ Status RedisZSets::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { std::string key; std::string meta_value; - int32_t total_delete = 0; + int64_t total_delete = 0; Status s; rocksdb::WriteBatch batch; rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]); @@ -184,7 +184,7 @@ Status RedisZSets::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { DEFER { delete iter; }; - while (iter->Valid()) { + while (iter->Valid() && static_cast(batch.Count()) < max_count) { key = iter->key().ToString(); meta_value = iter->value().ToString(); ParsedZSetsMetaValue parsed_zsets_meta_value(&meta_value); @@ -192,24 +192,18 @@ Status RedisZSets::PKPatternMatchDel(const std::string& pattern, int32_t* ret) { (StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) { parsed_zsets_meta_value.InitialMetaValue(); batch.Put(handles_[0], key, meta_value); - } - if (static_cast(batch.Count()) >= BATCH_DELETE_LIMIT) { - s = db_->Write(default_write_options_, &batch); - if (s.ok()) { - total_delete += static_cast(batch.Count()); - batch.Clear(); - } else { - *ret = total_delete; - return s; - } + remove_keys->push_back(key); } iter->Next(); } - if (batch.Count() != 0U) { + auto batchNum = batch.Count(); + if (batchNum != 0U) { s = db_->Write(default_write_options_, &batch); if (s.ok()) { - total_delete += static_cast(batch.Count()); + total_delete += static_cast(batchNum); batch.Clear(); + } else { + remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end()); } } diff --git a/src/storage/src/redis_zsets.h b/src/storage/src/redis_zsets.h index 76b2ec19b9..1a3ccb9b72 100644 --- a/src/storage/src/redis_zsets.h +++ b/src/storage/src/redis_zsets.h @@ -27,8 +27,7 @@ class RedisZSets : public Redis { Status GetProperty(const std::string& property, uint64_t* out) override; Status ScanKeyNum(KeyInfo* key_info) override; Status ScanKeys(const std::string& pattern, std::vector* keys) override; - Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override; - + Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector* remove_keys, const int64_t& max_count) override; // ZSets Commands Status ZAdd(const Slice& key, const std::vector& score_members, int32_t* ret); Status ZCard(const Slice& key, int32_t* card); diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index f348dc64be..b2f8f02f41 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -1137,26 +1137,27 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start, return s; } -Status Storage::PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret) { +Status Storage::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, + std::vector* remove_keys, const int64_t& max_count) { Status s; switch (data_type) { case DataType::kStrings: - s = strings_db_->PKPatternMatchDel(pattern, ret); + s = strings_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStrings, pattern, ret, remove_keys, max_count - *ret); break; case DataType::kHashes: - s = hashes_db_->PKPatternMatchDel(pattern, ret); + s = hashes_db_->PKPatternMatchDelWithRemoveKeys(DataType::kHashes, pattern, ret, remove_keys, max_count - *ret); break; case DataType::kLists: - s = lists_db_->PKPatternMatchDel(pattern, ret); + s = lists_db_->PKPatternMatchDelWithRemoveKeys(DataType::kLists, pattern, ret, remove_keys, max_count - *ret); break; case DataType::kZSets: - s = zsets_db_->PKPatternMatchDel(pattern, ret); + s = zsets_db_->PKPatternMatchDelWithRemoveKeys(DataType::kZSets, pattern, ret, remove_keys, max_count - *ret); break; case DataType::kSets: - s = sets_db_->PKPatternMatchDel(pattern, ret); + s = sets_db_->PKPatternMatchDelWithRemoveKeys(DataType::kSets, pattern, ret, remove_keys, max_count - *ret); break; case DataType::kStreams: - s = streams_db_->PKPatternMatchDel(pattern, ret); + s = streams_db_->PKPatternMatchDelWithRemoveKeys(DataType::kStreams, pattern, ret, remove_keys, max_count - *ret); break; default: s = Status::Corruption("Unsupported data type"); @@ -1486,6 +1487,9 @@ void Storage::ScanDatabase(const DataType& type) { case kLists: lists_db_->ScanDatabase(); break; + case DataType::kStreams: + // do noting + break; case kAll: strings_db_->ScanDatabase(); hashes_db_->ScanDatabase(); diff --git a/src/storage/tests/keys_test.cc b/src/storage/tests/keys_test.cc index e828969bbe..745b3eaf1a 100644 --- a/src/storage/tests/keys_test.cc +++ b/src/storage/tests/keys_test.cc @@ -2062,7 +2062,6 @@ for (const auto& kv : kvs) { // int32_t delete_count; // std::vector keys; // std::map type_status; - // //=============================== Strings =============================== // // ***************** Group 1 Test ***************** @@ -2460,7 +2459,6 @@ for (const auto& kv : kvs) { // ASSERT_EQ(keys.size(), 0); // //=============================== List =============================== - // // ***************** Group 1 Test ***************** // db.LPush("GP1_PKPATTERNMATCHDEL_LIST_KEY1", {"VALUE"}, &ret64); // db.LPush("GP1_PKPATTERNMATCHDEL_LIST_KEY2", {"VALUE"}, &ret64);