Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reconstruct slave sync thread model #2638

Merged
13 changes: 11 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ slow-cmd-thread-pool-size : 1
# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6

# The num of threads to write binlog in slaveNode when replicating,
# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum
#[NOTICE] It's highly recommended to set sync_binlog_thread_num_ equal to database_(then each DB cloud have a exclusive thread to write binlog),
# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8
# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases)
sync-binlog-thread-num : 1

# Directory to store log files of Pika, which contains multiple types of logs,
# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which
# is used for replication.
Expand Down Expand Up @@ -101,6 +108,8 @@ instance-mode : classic
# The default database id is DB 0. You can select a different one on
# a per-connection by using SELECT. The db id range is [0, 'databases' value -1].
# The value range of this parameter is [1, 8].
# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases),
# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper.
databases : 1

# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present.
Expand Down
5 changes: 5 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return sync_thread_num_;
}
int sync_binlog_thread_num() {
std::shared_lock l(rwlock_);
return sync_binlog_thread_num_;
}
std::string log_path() {
std::shared_lock l(rwlock_);
return log_path_;
Expand Down Expand Up @@ -793,6 +797,7 @@ class PikaConf : public pstd::BaseConf {
int slow_cmd_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
int expire_dump_days_ = 3;
int db_sync_speed_ = 0;
std::string slaveof_;
Expand Down
9 changes: 6 additions & 3 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class PikaReplClient {
pstd::Status Close(const std::string& ip, int port);

void Schedule(net::TaskFunc func, void* arg);
void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name);
void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
Expand All @@ -80,13 +81,15 @@ class PikaReplClient {
pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip);

private:
size_t GetHashIndex(const std::string& key, bool upper_half);
void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast<int32_t>(bg_workers_.size()); }
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetHashIndexByKey(const std::string& key);
void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast<int32_t>(write_binlog_workers_.size()); }

std::unique_ptr<PikaReplClientThread> client_thread_;
int next_avail_ = 0;
std::hash<std::string> str_hash;
std::vector<std::unique_ptr<PikaReplBgWorker>> bg_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_binlog_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_db_workers_;
};

#endif
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class PikaReplicaManager {
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);

Expand Down
7 changes: 7 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ void InfoCmd::InfoServer(std::string& info) {
tmp_stream << "tcp_port:" << g_pika_conf->port() << "\r\n";
tmp_stream << "thread_num:" << g_pika_conf->thread_num() << "\r\n";
tmp_stream << "sync_thread_num:" << g_pika_conf->sync_thread_num() << "\r\n";
tmp_stream << "sync_binlog_thread_num:" << g_pika_conf->sync_binlog_thread_num() << "\r\n";
tmp_stream << "uptime_in_seconds:" << (current_time_s - g_pika_server->start_time_s()) << "\r\n";
tmp_stream << "uptime_in_days:" << (current_time_s / (24 * 3600) - g_pika_server->start_time_s() / (24 * 3600) + 1)
<< "\r\n";
Expand Down Expand Up @@ -1501,6 +1502,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->sync_thread_num());
}

if (pstd::stringmatch(pattern.data(), "sync-binlog-thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "sync-binlog-thread-num");
EncodeNumber(&config_body, g_pika_conf->sync_binlog_thread_num());
}

if (pstd::stringmatch(pattern.data(), "log-path", 1) != 0) {
elements += 2;
EncodeString(&config_body, "log-path");
Expand Down
9 changes: 9 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ int PikaConf::Load() {
}
default_db_ = db_structs_[0].db_name;

// sync_binlog_thread_num_ must be set after the setting of databases_
GetConfInt("sync-binlog-thread-num", &sync_binlog_thread_num_);
if (sync_binlog_thread_num_ <= 0){
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
sync_binlog_thread_num_ = databases_;
}else {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
// final value is MIN(sync_binlog_thread_num, databases_)
sync_binlog_thread_num_ = sync_binlog_thread_num_ > databases_ ? databases_ : sync_binlog_thread_num_;
}

int tmp_replication_num = 0;
GetConfInt("replication-num", &tmp_replication_num);
if (tmp_replication_num > 4 || tmp_replication_num < 0) {
Expand Down
60 changes: 44 additions & 16 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < 2 * g_pika_conf->sync_thread_num(); ++i) {
bg_workers_.push_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
for(int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++){
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
write_binlog_workers_.push_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
}
for (int i = 0; i < g_pika_conf->sync_thread_num(); ++i) {
write_db_workers_.push_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -43,49 +46,74 @@ int PikaReplClient::Start() {
LOG(FATAL) << "Start ReplClient ClientThread Error: " << res
<< (res == net::kCreateThreadError ? ": create thread error " : ": other error");
}
for (auto & bg_worker : bg_workers_) {
res = bg_worker->StartThread();
for (auto & binlog_worker : write_binlog_workers_) {
res = binlog_worker->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start Pika Repl Worker Thread Error: " << res
LOG(FATAL) << "Start Pika Repl Write Binlog Worker Thread Error: " << res
<< (res == net::kCreateThreadError ? ": create thread error " : ": other error");
}
}
for (auto & db_worker : write_db_workers_) {
res = db_worker->StartThread();
if (res != net::kSuccess) {
LOG(FATAL) << "Start Pika Repl Write DB Worker Thread Error: " << res
<< (res == net::kCreateThreadError ? ": create thread error " : ": other error");
}
}
return res;
}

int PikaReplClient::Stop() {
client_thread_->StopThread();
for (auto & bg_worker : bg_workers_) {
bg_worker->StopThread();
for (auto & binlog_worker : write_binlog_workers_) {
binlog_worker->StopThread();
}
for (auto &db_worker: write_db_workers_) {
db_worker->StopThread();
}
return 0;
}

void PikaReplClient::Schedule(net::TaskFunc func, void* arg) {
bg_workers_[next_avail_]->Schedule(func, arg);
write_binlog_workers_[next_avail_]->Schedule(func, arg);
UpdateNextAvail();
}

void PikaReplClient::ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name) {
size_t index = GetBinlogWorkerIndexByDBName(db_name);
write_binlog_workers_[index]->Schedule(func, arg);
};

void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
size_t index = GetHashIndex(db_name, true);
auto task_arg = new ReplClientWriteBinlogTaskArg(res, conn, res_private_data, bg_workers_[index].get());
bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
size_t index = GetBinlogWorkerIndexByDBName(db_name);
auto task_arg = new ReplClientWriteBinlogTaskArg(res, conn, res_private_data, write_binlog_workers_[index].get());
write_binlog_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
}

void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name) {
const PikaCmdArgsType& argv = cmd_ptr->argv();
std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0];
size_t index = GetHashIndex(dispatch_key, false);
size_t index = GetHashIndexByKey(dispatch_key);
auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr, offset, db_name);
bg_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
}

size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) {
char db_num = db_name.back();
if (db_num < '0' || db_num > '8') {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
LOG(ERROR)
<< "Corruption in consuming binlog: the last char of the db_name(extracted from binlog) is not a valid db num, the extracted db_num is "
<< db_num << " while write_binlog_workers.size() is " << write_binlog_workers_.size();
}
return (db_num - '0') % write_binlog_workers_.size();
}

size_t PikaReplClient::GetHashIndex(const std::string& key, bool upper_half) {
size_t hash_base = bg_workers_.size() / 2;
return (str_hash(key) % hash_base) + (upper_half ? 0 : hash_base);
size_t PikaReplClient::GetHashIndexByKey(const std::string& key) {
size_t hash_base = write_db_workers_.size();
return (str_hash(key) % hash_base);
}

Status PikaReplClient::Write(const std::string& ip, const int port, const std::string& msg) {
Expand Down
6 changes: 4 additions & 2 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ int PikaReplClientConn::DealMessage() {
break;
}
case InnerMessage::kTrySync: {
const std::string& db_name = response->try_sync().slot().db_name();
//TrySync resp must contain db_name
assert(!db_name.empty());
auto task_arg =
new ReplClientTaskArg(response, std::dynamic_pointer_cast<PikaReplClientConn>(shared_from_this()));
g_pika_rm->ScheduleReplClientBGTask(&PikaReplClientConn::HandleTrySyncResponse, static_cast<void*>(task_arg));
g_pika_rm->ScheduleReplClientBGTaskByDBName(&PikaReplClientConn::HandleTrySyncResponse, static_cast<void*>(task_arg), db_name);
break;
}
case InnerMessage::kBinlogSync: {
Expand Down Expand Up @@ -193,7 +196,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
LOG(WARNING) << "TrySync Failed: " << reply;
return;
}

const InnerMessage::InnerResponse_TrySync& try_sync_response = response->try_sync();
const InnerMessage::Slot& db_response = try_sync_response.slot();
std::string db_name = db_response.db_name();
Expand Down
4 changes: 4 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ void PikaReplicaManager::ScheduleReplClientBGTask(net::TaskFunc func, void* arg)
pika_repl_client_->Schedule(func, arg);
}

void PikaReplicaManager::ScheduleReplClientBGTaskByDBName(net::TaskFunc func, void* arg, const std::string &db_name) {
pika_repl_client_->ScheduleByDBName(func, arg, db_name);
}

void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
Expand Down
Loading