Skip to content

Commit

Permalink
1 make write_binlog_thread_num configurable
Browse files Browse the repository at this point in the history
2 ensure TrySync resp is handled after binlog tasks
  • Loading branch information
cheniujh committed May 13, 2024
1 parent 070e0b0 commit 662083e
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 13 deletions.
13 changes: 11 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ slow-cmd-thread-pool-size : 1
# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6

# The num of threads to write binlog in slaveNode when replicating,
# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum
#[NOTICE] It's highly recommended to set sync_binlog_thread_num_ equal to database_(then each DB cloud have a exclusive thread to write binlog),
# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8
# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases)
sync-binlog-thread-num : 1

# Directory to store log files of Pika, which contains multiple types of logs,
# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which
# is used for replication.
Expand Down Expand Up @@ -101,6 +108,8 @@ instance-mode : classic
# The default database id is DB 0. You can select a different one on
# a per-connection by using SELECT. The db id range is [0, 'databases' value -1].
# The value range of this parameter is [1, 8].
# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases),
# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper.
databases : 1

# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present.
Expand Down
5 changes: 5 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return sync_thread_num_;
}
int sync_binlog_thread_num() {
std::shared_lock l(rwlock_);
return sync_binlog_thread_num_;
}
std::string log_path() {
std::shared_lock l(rwlock_);
return log_path_;
Expand Down Expand Up @@ -784,6 +788,7 @@ class PikaConf : public pstd::BaseConf {
int slow_cmd_thread_pool_size_ = 0;
std::unordered_set<std::string> slow_cmd_set_;
int sync_thread_num_ = 0;
int sync_binlog_thread_num_ = 0;
int expire_dump_days_ = 3;
int db_sync_speed_ = 0;
std::string slaveof_;
Expand Down
2 changes: 2 additions & 0 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class PikaReplClient {
pstd::Status Close(const std::string& ip, int port);

void Schedule(net::TaskFunc func, void* arg);
void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name);
void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
Expand All @@ -80,6 +81,7 @@ class PikaReplClient {
pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip);

private:
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetHashIndexByKey(const std::string& key);
void UpdateNextAvail() { next_avail_ = (next_avail_ + 1) % static_cast<int32_t>(write_binlog_workers_.size()); }

Expand Down
1 change: 1 addition & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ class PikaReplicaManager {
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);

Expand Down
7 changes: 7 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ void InfoCmd::InfoServer(std::string& info) {
tmp_stream << "tcp_port:" << g_pika_conf->port() << "\r\n";
tmp_stream << "thread_num:" << g_pika_conf->thread_num() << "\r\n";
tmp_stream << "sync_thread_num:" << g_pika_conf->sync_thread_num() << "\r\n";
tmp_stream << "sync_binlog_thread_num:" << g_pika_conf->sync_binlog_thread_num() << "\r\n";
tmp_stream << "uptime_in_seconds:" << (current_time_s - g_pika_server->start_time_s()) << "\r\n";
tmp_stream << "uptime_in_days:" << (current_time_s / (24 * 3600) - g_pika_server->start_time_s() / (24 * 3600) + 1)
<< "\r\n";
Expand Down Expand Up @@ -1516,6 +1517,12 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeNumber(&config_body, g_pika_conf->sync_thread_num());
}

if (pstd::stringmatch(pattern.data(), "sync-binlog-thread-num", 1) != 0) {
elements += 2;
EncodeString(&config_body, "sync-binlog-thread-num");
EncodeNumber(&config_body, g_pika_conf->sync_binlog_thread_num());
}

if (pstd::stringmatch(pattern.data(), "log-path", 1) != 0) {
elements += 2;
EncodeString(&config_body, "log-path");
Expand Down
9 changes: 9 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ int PikaConf::Load() {
}
default_db_ = db_structs_[0].db_name;

// sync_binlog_thread_num_ must be set after the setting of databases_
GetConfInt("sync-binlog-thread-num", &sync_binlog_thread_num_);
if (sync_binlog_thread_num_ <= 0){
sync_binlog_thread_num_ = databases_;
}else {
// final value is MIN(sync_binlog_thread_num, databases_)
sync_binlog_thread_num_ = sync_binlog_thread_num_ > databases_ ? databases_ : sync_binlog_thread_num_;
}

int tmp_replication_num = 0;
GetConfInt("replication-num", &tmp_replication_num);
if (tmp_replication_num > 4 || tmp_replication_num < 0) {
Expand Down
26 changes: 17 additions & 9 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extern std::unique_ptr<PikaReplicaManager> g_pika_rm;
PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for(int i = 0; i < g_pika_conf->databases(); i++){
for(int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++){
write_binlog_workers_.push_back(std::make_unique<PikaReplBgWorker>(PIKA_SYNC_BUFFER_SIZE));
}
for (int i = 0; i < g_pika_conf->sync_thread_num(); ++i) {
Expand Down Expand Up @@ -79,17 +79,15 @@ void PikaReplClient::Schedule(net::TaskFunc func, void* arg) {
UpdateNextAvail();
}

void PikaReplClient::ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name) {
size_t index = GetBinlogWorkerIndexByDBName(db_name);
write_binlog_workers_[index]->Schedule(func, arg);
};

void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
char db_num = db_name.back();
int index = db_num - '0';
if (index < 0 || index > write_binlog_workers_.size()) {
LOG(ERROR)
<< "Corruption in cosuming binlog: the last char of the db_name(extracted from binlog) is not a valid db num, the extracted db_num/worker_index is "
<< index << " while write_binlog_workers.size() is " << write_binlog_workers_.size();
return;
}
size_t index = GetBinlogWorkerIndexByDBName(db_name);
auto task_arg = new ReplClientWriteBinlogTaskArg(res, conn, res_private_data, write_binlog_workers_[index].get());
write_binlog_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
}
Expand All @@ -103,6 +101,16 @@ void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, co
write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
}

size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) {
char db_num = db_name.back();
if (db_num < '0' || db_num > '8') {
LOG(ERROR)
<< "Corruption in consuming binlog: the last char of the db_name(extracted from binlog) is not a valid db num, the extracted db_num is "
<< db_num << " while write_binlog_workers.size() is " << write_binlog_workers_.size();
}
return (db_num - '0') % write_binlog_workers_.size();
}

size_t PikaReplClient::GetHashIndexByKey(const std::string& key) {
size_t hash_base = write_db_workers_.size();
return (str_hash(key) % hash_base);
Expand Down
6 changes: 4 additions & 2 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ int PikaReplClientConn::DealMessage() {
break;
}
case InnerMessage::kTrySync: {
const std::string& db_name = response->try_sync().slot().db_name();
//TrySync resp must contain db_name
assert(!db_name.empty());
auto task_arg =
new ReplClientTaskArg(response, std::dynamic_pointer_cast<PikaReplClientConn>(shared_from_this()));
g_pika_rm->ScheduleReplClientBGTask(&PikaReplClientConn::HandleTrySyncResponse, static_cast<void*>(task_arg));
g_pika_rm->ScheduleReplClientBGTaskByDBName(&PikaReplClientConn::HandleTrySyncResponse, static_cast<void*>(task_arg), db_name);
break;
}
case InnerMessage::kBinlogSync: {
Expand Down Expand Up @@ -193,7 +196,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
LOG(WARNING) << "TrySync Failed: " << reply;
return;
}

const InnerMessage::InnerResponse_TrySync& try_sync_response = response->try_sync();
const InnerMessage::Slot& db_response = try_sync_response.slot();
std::string db_name = db_response.db_name();
Expand Down
4 changes: 4 additions & 0 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,10 @@ void PikaReplicaManager::ScheduleReplClientBGTask(net::TaskFunc func, void* arg)
pika_repl_client_->Schedule(func, arg);
}

void PikaReplicaManager::ScheduleReplClientBGTaskByDBName(net::TaskFunc func, void* arg, const std::string &db_name) {
pika_repl_client_->ScheduleByDBName(func, arg, db_name);
}

void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data) {
Expand Down

0 comments on commit 662083e

Please sign in to comment.