Skip to content

Commit

Permalink
fix: Reconstruct slave sync thread model (#2638)
Browse files Browse the repository at this point in the history
* reconstruct slave comsuming thread model, new model:
1 each db has one exclusive thread to write binlog
2 every db share the same thread pool to write db

* 1 make write_binlog_thread_num configurable
2 ensure TrySync resp is handled after binlog tasks

* 1 add extra 10s sleep in randomSpopstore test to avoid the sporadic failure of this test case
2 revised some comments about write-binlog-worker-num in pika.conf

* 1 use global constexpr to replace fixed num in terms of max_db_num
2 done some format work

---------

Co-authored-by: cjh <[email protected]>
  • Loading branch information
2 people authored and brother-jin committed Jul 31, 2024
1 parent eb2723c commit f8fd697
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 24 deletions.
13 changes: 11 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ slow-cmd-thread-pool-size : 1
# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6

# The num of threads to write binlog in slaveNode when replicating,
# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum
#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog),
# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8
# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases)
sync-binlog-thread-num : 1

# Directory to store log files of Pika, which contains multiple types of logs,
# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which
# is used for replication.
Expand Down Expand Up @@ -97,6 +104,8 @@ instance-mode : classic
# The default database id is DB 0. You can select a different one on
# a per-connection by using SELECT. The db id range is [0, 'databases' value -1].
# The value range of this parameter is [1, 8].
# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases),
# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper.
databases : 1

# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present.
Expand Down
5 changes: 5 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return sync_thread_num_;
}
int sync_binlog_thread_num() {
std::shared_lock l(rwlock_);
return sync_binlog_thread_num_;
}
std::string log_path() {
std::shared_lock l(rwlock_);
return log_path_;
Expand Down Expand Up @@ -744,6 +748,7 @@ class PikaConf : public pstd::BaseConf {
int slow_cmd_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
int expire_dump_days_ = 3;
int db_sync_speed_ = 0;
std::string slaveof_;
Expand Down
2 changes: 2 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#define PIKA_SERVER_ID_MAX 65535

class PikaServer;
/* Global Const */
constexpr int MAX_DB_NUM = 8;

/* Port shift */
const int kPortShiftRSync = 1000;
Expand Down
9 changes: 6 additions & 3 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class PikaReplClient {
pstd::Status Close(const std::string& ip, int port);

void Schedule(net::TaskFunc func, void* arg);
void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name);
void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
Expand All @@ -80,13 +81,15 @@ class PikaReplClient {
pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip);

private:
size_t GetHashIndex(const std::string& key, bool upper_half);
void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast<int32_t>(bg_workers_.size()); }
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetHashIndexByKey(const std::string& key);
void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast<int32_t>(write_binlog_workers_.size()); }

std::unique_ptr<PikaReplClientThread> client_thread_;
int next_avail_ = 0;
std::hash<std::string> str_hash;
std::vector<std::unique_ptr<PikaReplBgWorker>> bg_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_binlog_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_db_workers_;
};

#endif
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class PikaReplicaManager {
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);

Expand Down
7 changes: 7 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,7 @@ void InfoCmd::InfoServer(std::string& info) {
tmp_stream << "tcp_port:" << g_pika_conf->port() << "\r\n";
tmp_stream << "thread_num:" << g_pika_conf->thread_num() << "\r\n";
tmp_stream << "sync_thread_num:" << g_pika_conf->sync_thread_num() << "\r\n";
tmp_stream << "sync_binlog_thread_num:" << g_pika_conf->sync_binlog_thread_num() << "\r\n";
tmp_stream << "uptime_in_seconds:" << (current_time_s - g_pika_server->start_time_s()) << "\r\n";
tmp_stream << "uptime_in_days:" << (current_time_s / (24 * 3600) - g_pika_server->start_time_s() / (24 * 3600) + 1)
<< "\r\n";
Expand Down Expand Up @@ -1541,6 +1542,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->sync_thread_num());
}

if (pstd::stringmatch(pattern.data(), "sync-binlog-thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "sync-binlog-thread-num");
EncodeNumber(&config_body, g_pika_conf->sync_binlog_thread_num());
}

if (pstd::stringmatch(pattern.data(), "log-path", 1) != 0) {
elements += 2;
EncodeString(&config_body, "log-path");
Expand Down
11 changes: 10 additions & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ int PikaConf::Load() {

if (classic_mode_.load()) {
GetConfInt("databases", &databases_);
if (databases_ < 1 || databases_ > 8) {
if (databases_ < 1 || databases_ > MAX_DB_NUM) {
LOG(FATAL) << "config databases error, limit [1 ~ 8], the actual is: " << databases_;
}
for (int idx = 0; idx < databases_; ++idx) {
Expand All @@ -184,6 +184,15 @@ int PikaConf::Load() {
}
default_db_ = db_structs_[0].db_name;

// sync_binlog_thread_num_ must be set after the setting of databases_
GetConfInt("sync-binlog-thread-num", &sync_binlog_thread_num_);
if (sync_binlog_thread_num_ <= 0) {
sync_binlog_thread_num_ = databases_;
} else {
// final value is MIN(sync_binlog_thread_num, databases_)
sync_binlog_thread_num_ = sync_binlog_thread_num_ > databases_ ? databases_ : sync_binlog_thread_num_;
}

int tmp_replication_num = 0;
GetConfInt("replication-num", &tmp_replication_num);
if (tmp_replication_num > 4 || tmp_replication_num < 0) {
Expand Down
63 changes: 47 additions & 16 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < 2 * g_pika_conf->sync_thread_num(); ++i) {
bg_workers_.push_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) {
write_binlog_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
}
for (int i = 0; i < g_pika_conf->sync_thread_num(); ++i) {
write_db_workers_.emplace_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
}
}

Expand All @@ -43,49 +46,77 @@ int PikaReplClient::Start() {
LOG(FATAL) << "Start ReplClient ClientThread Error: " << res
<< (res == net::kCreateThreadError ? ": create thread error " : ": other error");
}
for (auto & bg_worker : bg_workers_) {
res = bg_worker->StartThread();
for (auto & binlog_worker : write_binlog_workers_) {
res = binlog_worker->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start Pika Repl Worker Thread Error: " << res
LOG(FATAL) << "Start Pika Repl Write Binlog Worker Thread Error: " << res
<< (res == net::kCreateThreadError ? ": create thread error " : ": other error");
}
}
for (auto & db_worker : write_db_workers_) {
res = db_worker->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start Pika Repl Write DB Worker Thread Error: " << res
<< (res == net::kCreateThreadError ? ": create thread error " : ": other error");
}
}
return res;
}

int PikaReplClient::Stop() {
client_thread_->StopThread();
for (auto & bg_worker : bg_workers_) {
bg_worker->StopThread();
for (auto & binlog_worker : write_binlog_workers_) {
binlog_worker->StopThread();
}
for (auto &db_worker: write_db_workers_) {
db_worker->StopThread();
}
return 0;
}

void PikaReplClient::Schedule(net::TaskFunc func, void* arg) {
bg_workers_[next_avail_]->Schedule(func, arg);
write_binlog_workers_[next_avail_]->Schedule(func, arg);
UpdateNextAvail();
}

void PikaReplClient::ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name) {
size_t index = GetBinlogWorkerIndexByDBName(db_name);
write_binlog_workers_[index]->Schedule(func, arg);
};

void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
size_t index = GetHashIndex(db_name, true);
auto task_arg = new ReplClientWriteBinlogTaskArg(res, conn, res_private_data, bg_workers_[index].get());
bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
size_t index = GetBinlogWorkerIndexByDBName(db_name);
auto task_arg = new ReplClientWriteBinlogTaskArg(res, conn, res_private_data, write_binlog_workers_[index].get());
write_binlog_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
}

void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name) {
const PikaCmdArgsType& argv = cmd_ptr->argv();
std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0];
size_t index = GetHashIndex(dispatch_key, false);
size_t index = GetHashIndexByKey(dispatch_key);
auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr, offset, db_name);
bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
}

size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) {
char db_num_c = db_name.back();
int32_t db_num = db_num_c - '0';
//Valid range of db_num is [0, MAX_DB_NUM)
if (db_num < 0 || db_num >= MAX_DB_NUM) {
LOG(ERROR)
<< "Corruption in consuming binlog: the last char of the db_name(extracted from binlog) is not a valid db num, the extracted db_num is "
<< db_num_c << " while write_binlog_workers.size() is " << write_binlog_workers_.size();
if (db_num < 0) { assert(false && "db_num invalid, check if the db_name in the request is valid, also check the ERROR Log of Pika."); }
}
return db_num % write_binlog_workers_.size();
}

size_t PikaReplClient::GetHashIndex(const std::string& key, bool upper_half) {
size_t hash_base = bg_workers_.size() / 2;
return (str_hash(key) % hash_base) + (upper_half ? 0 : hash_base);
size_t PikaReplClient::GetHashIndexByKey(const std::string& key) {
size_t hash_base = write_db_workers_.size();
return (str_hash(key) % hash_base);
}

Status PikaReplClient::Write(const std::string& ip, const int port, const std::string& msg) {
Expand Down
6 changes: 4 additions & 2 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ int PikaReplClientConn::DealMessage() {
break;
}
case InnerMessage::kTrySync: {
const std::string& db_name = response->try_sync().slot().db_name();
//TrySync resp must contain db_name
assert(!db_name.empty());
auto task_arg =
new ReplClientTaskArg(response, std::dynamic_pointer_cast<PikaReplClientConn>(shared_from_this()));
g_pika_rm->ScheduleReplClientBGTask(&PikaReplClientConn::HandleTrySyncResponse, static_cast<void*>(task_arg));
g_pika_rm->ScheduleReplClientBGTaskByDBName(&PikaReplClientConn::HandleTrySyncResponse, static_cast<void*>(task_arg), db_name);
break;
}
case InnerMessage::kBinlogSync: {
Expand Down Expand Up @@ -192,7 +195,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
LOG(WARNING) << "TrySync Failed: " << reply;
return;
}

const InnerMessage::InnerResponse_TrySync& try_sync_response = response->try_sync();
const InnerMessage::Slot& db_response = try_sync_response.slot();
std::string db_name = db_response.db_name();
Expand Down
4 changes: 4 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,10 @@ void PikaReplicaManager::ScheduleReplClientBGTask(net::TaskFunc func, void* arg)
pika_repl_client_->Schedule(func, arg);
}

void PikaReplicaManager::ScheduleReplClientBGTaskByDBName(net::TaskFunc func, void* arg, const std::string &db_name) {
pika_repl_client_->ScheduleByDBName(func, arg, db_name);
}

void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
Expand Down
1 change: 1 addition & 0 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ var _ = Describe("should replication ", func() {

log.Println("randomSpopstore test start")
execute(&ctx, clientMaster, 4, randomSpopstroeThread)
time.Sleep(10 * time.Second)
master_spopstore_set := clientMaster.SMembers(ctx, "set1")
Expect(master_spopstore_set.Err()).NotTo(HaveOccurred())
slave_spopstore_set := clientSlave.SMembers(ctx, "set1")
Expand Down

0 comments on commit f8fd697

Please sign in to comment.