Skip to content

Commit

Permalink
fix: PkPatternMatchDel inconsistent between rediscache and db (OpenAt…
Browse files Browse the repository at this point in the history
…omFoundation#2839)

* fix: PkPatternMatchDel inconsistent between rediscache and db

---------

Co-authored-by: haiyang426 <[email protected]>
Co-authored-by: chejinge <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2024
1 parent faf6a01 commit 322619f
Show file tree
Hide file tree
Showing 20 changed files with 134 additions and 125 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ ExternalProject_Add(rediscache
set(REDISCACHE_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(REDISCACHE_LIBRARY ${INSTALL_LIBDIR}/librediscache.a)


option(USE_PIKA_TOOLS "compile pika-tools" OFF)
if (USE_PIKA_TOOLS)
ExternalProject_Add(hiredis
Expand Down
5 changes: 1 addition & 4 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,9 @@ slave-priority : 100
# The disable_auto_compactions option is [true | false]
disable_auto_compactions : false

<<<<<<< HEAD
=======
# Rocksdb max_subcompactions, increasing this value can accelerate the exec speed of a single compaction task
# it's recommended to increase it's value if large compaction is found in you instance
max-subcompactions : 1
>>>>>>> f95f867c (fix: Revised RocksDB-Related Parameters in Pika (#2728))
# The minimum disk usage ratio for checking resume.
# If the disk usage ratio is lower than min-check-resume-ratio, it will not check resume, only higher will check resume.
# Its default value is 0.7.
Expand Down Expand Up @@ -501,7 +498,7 @@ default-slot-num : 1024
# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit.
# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes).
throttle-bytes-per-second : 207200000
<<<<<<< HEAD

# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small.
# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command
# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust.
Expand Down
4 changes: 4 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,17 @@ class PKPatternMatchDelCmd : public Cmd {
PKPatternMatchDelCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::ADMIN)) {}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); }

private:
storage::DataType type_ = storage::kAll;
std::vector<std::string> remove_keys_;
std::string pattern_;
int64_t max_count_;
void DoInitial() override;
};

Expand Down
59 changes: 55 additions & 4 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3098,15 +3098,66 @@ void PKPatternMatchDelCmd::DoInitial() {
res_.SetRes(CmdRes::kInvalidDbType, kCmdNamePKPatternMatchDel);
return;
}
max_count_ = storage::BATCH_DELETE_LIMIT;
if (argv_.size() > 3) {
if (pstd::string2int(argv_[3].data(), argv_[3].size(), &max_count_) == 0 || max_count_ < 1 || max_count_ > storage::BATCH_DELETE_LIMIT) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
}
}

void PKPatternMatchDelCmd::Do() {
int ret = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDel(type_, pattern_, &ret);
if (s.ok()) {
res_.AppendInteger(ret);
int64_t count = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDelWithRemoveKeys(type_, pattern_, &count, &remove_keys_, max_count_);

if(s.ok()) {
res_.AppendInteger(count);
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
if (count >= 0) {
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
}
}
}

void PKPatternMatchDelCmd::DoThroughDB() {
Do();
}

void PKPatternMatchDelCmd::DoUpdateCache() {
if (s_.ok()) {
std::vector<std::string> v;
for (auto key : remove_keys_) {
// only delete the corresponding prefix
switch (type_) {
case storage::kSets:
v.emplace_back(PCacheKeyPrefixS + key);
break;
case storage::kLists:
v.emplace_back(PCacheKeyPrefixL + key);
break;
case storage::kStrings:
v.emplace_back(PCacheKeyPrefixK + key);
break;
case storage::kZSets:
v.emplace_back(PCacheKeyPrefixZ + key);
break;
case storage::kHashes:
v.emplace_back(PCacheKeyPrefixH + key);
break;
default:
break;
}
}
db_->cache()->Del(v);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ class Storage {

// Traverses the database of the specified type, removing the Key that matches
// the pattern
Status PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret);
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count);

// Iterate over a collection of elements
// return next_key that the user need to use as the start_key argument
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class Redis {
virtual Status GetProperty(const std::string& property, uint64_t* out) = 0;
virtual Status ScanKeyNum(KeyInfo* key_info) = 0;
virtual Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) = 0;
virtual Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) = 0;

// Keys Commands
virtual Status Expire(const Slice& key, int32_t ttl) = 0;
Expand All @@ -110,7 +109,7 @@ class Redis {
virtual Status Expireat(const Slice& key, int32_t timestamp) = 0;
virtual Status Persist(const Slice& key) = 0;
virtual Status TTL(const Slice& key, int64_t* timestamp) = 0;

virtual Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) = 0;
Status SetMaxCacheStatisticKeys(size_t max_cache_statistic_keys);
Status SetSmallCompactionThreshold(uint64_t small_compaction_threshold);
Status SetSmallCompactionDurationThreshold(uint64_t small_compaction_duration_threshold);
Expand Down
24 changes: 9 additions & 15 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Status RedisHashes::ScanKeys(const std::string& pattern, std::vector<std::string
return Status::OK();
}

Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisHashes::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -154,40 +154,34 @@ Status RedisHashes::PKPatternMatchDel(const std::string& pattern, int32_t* ret)

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
if (!parsed_hashes_meta_value.IsStale() && (parsed_hashes_meta_value.count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_hashes_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>( batch.Count());
batch.Clear();
} else {
*ret = total_delete;
return s;
}
remove_keys->push_back(key);
}
iter->Next();
}
if (batch.Count() != 0U) {
auto batchNum = batch.Count();
if (batchNum != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batchNum);
batch.Clear();
} else {
remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_hashes.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RedisHashes : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Hashes Commands
Status HDel(const Slice& key, const std::vector<std::string>& fields, int32_t* ret);
Expand Down
24 changes: 9 additions & 15 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status RedisLists::ScanKeys(const std::string& pattern, std::vector<std::string>
return Status::OK();
}

Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
Status RedisLists::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -161,40 +161,34 @@ Status RedisLists::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
if (!parsed_lists_meta_value.IsStale() && (parsed_lists_meta_value.count() != 0U) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_lists_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
return s;
}
remove_keys->push_back(key);
}
iter->Next();
}
if (batch.Count() != 0U) {
auto batchNum = batch.Count();
if (batchNum != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batchNum);
batch.Clear();
} else {
remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_lists.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class RedisLists : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Lists commands;
Status LIndex(const Slice& key, int64_t index, std::string* element);
Expand Down
24 changes: 9 additions & 15 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ rocksdb::Status RedisSets::ScanKeys(const std::string& pattern, std::vector<std:
return rocksdb::Status::OK();
}

rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
rocksdb::Status RedisSets::PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -161,40 +161,34 @@ rocksdb::Status RedisSets::PKPatternMatchDel(const std::string& pattern, int32_t

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
rocksdb::Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[0]);
DEFER {
delete iter;
};
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
key = iter->key().ToString();
meta_value = iter->value().ToString();
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
if (!parsed_sets_meta_value.IsStale() && (parsed_sets_meta_value.count() != 0) &&
(StringMatch(pattern.data(), pattern.size(), key.data(), key.size(), 0) != 0)) {
parsed_sets_meta_value.InitialMetaValue();
batch.Put(handles_[0], key, meta_value);
}
if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
return s;
}
remove_keys->push_back(key);
}
iter->Next();
}
if (batch.Count() != 0U) {
auto batchNum = batch.Count();
if (batchNum != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batchNum);
batch.Clear();
} else {
remove_keys->erase(remove_keys->end() - batchNum, remove_keys->end());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis_sets.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RedisSets : public Redis {
Status GetProperty(const std::string& property, uint64_t* out) override;
Status ScanKeyNum(KeyInfo* key_info) override;
Status ScanKeys(const std::string& pattern, std::vector<std::string>* keys) override;
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret) override;
Status PKPatternMatchDelWithRemoveKeys(const DataType& data_type, const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) override;

// Setes Commands
Status SAdd(const Slice& key, const std::vector<std::string>& members, int32_t* ret);
Expand Down
Loading

0 comments on commit 322619f

Please sign in to comment.