diff --git a/include/pika_server.h b/include/pika_server.h index 691a0fea08..3154bbed01 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -14,6 +14,7 @@ # include #endif #include +#include #include "net/include/bg_thread.h" #include "net/include/net_pubsub.h" @@ -30,7 +31,6 @@ #include "include/pika_conf.h" #include "include/pika_define.h" #include "include/pika_dispatch_thread.h" -#include "include/pika_monitor_thread.h" #include "include/pika_repl_client.h" #include "include/pika_repl_server.h" #include "include/pika_rsync_service.h" @@ -266,9 +266,9 @@ class PikaServer : public pstd::noncopyable { /* * Monitor used */ - bool HasMonitorClients(); + bool HasMonitorClients() const; void AddMonitorMessage(const std::string& monitor_message); - void AddMonitorClient(const std::shared_ptr &client_ptr); + void AddMonitorClient(std::shared_ptr client_ptr); /* * Slowlog used @@ -410,7 +410,8 @@ class PikaServer : public pstd::noncopyable { /* * Monitor used */ - std::unique_ptr pika_monitor_thread_; + mutable pstd::Mutex monitor_mutex_protector_; + std::set, std::owner_less>> pika_monitor_clients_; /* * Rsync used diff --git a/src/net/include/thread_pool.h b/src/net/include/thread_pool.h index c9e6928a31..c3b3999914 100644 --- a/src/net/include/thread_pool.h +++ b/src/net/include/thread_pool.h @@ -65,7 +65,6 @@ class ThreadPool : public pstd::noncopyable { void cur_time_queue_size(size_t* qsize); std::string thread_pool_name(); - private: void runInThread(); diff --git a/src/net/src/backend_thread.cc b/src/net/src/backend_thread.cc index 134852131c..a6d943c9ca 100644 --- a/src/net/src/backend_thread.cc +++ b/src/net/src/backend_thread.cc @@ -447,7 +447,7 @@ void* BackendThread::ThreadMain() { } } - if ((pfe->mask & kErrorEvent) || (should_close)) { + if ((pfe->mask & kErrorEvent) || should_close) { { LOG(INFO) << "close connection " << pfe->fd << " reason " << pfe->mask << " " << should_close; net_multiplexer_->NetDelEvent(pfe->fd, 0); diff --git a/src/net/src/dispatch_thread.cc b/src/net/src/dispatch_thread.cc index ab04644ceb..83ad7cdef9 100644 --- a/src/net/src/dispatch_thread.cc +++ b/src/net/src/dispatch_thread.cc @@ -55,7 +55,7 @@ DispatchThread::~DispatchThread() = default; int DispatchThread::StartThread() { for (int i = 0; i < work_num_; i++) { int ret = handle_->CreateWorkerSpecificData(&(worker_thread_[i]->private_data_)); - if (ret) { + if (ret) { return ret; } @@ -63,7 +63,7 @@ int DispatchThread::StartThread() { worker_thread_[i]->set_thread_name("WorkerThread"); } ret = worker_thread_[i]->StartThread(); - if (ret) { + if (ret) { return ret; } } @@ -76,12 +76,12 @@ int DispatchThread::StopThread() { } for (int i = 0; i < work_num_; i++) { int ret = worker_thread_[i]->StopThread(); - if (ret) { + if (ret) { return ret; } if (worker_thread_[i]->private_data_) { ret = handle_->DeleteWorkerSpecificData(worker_thread_[i]->private_data_); - if (ret) { + if (ret) { return ret; } worker_thread_[i]->private_data_ = nullptr; diff --git a/src/net/src/holy_thread.cc b/src/net/src/holy_thread.cc index 2d63eb4a5c..b7c2f1990a 100644 --- a/src/net/src/holy_thread.cc +++ b/src/net/src/holy_thread.cc @@ -72,7 +72,7 @@ std::shared_ptr HolyThread::get_conn(int fd) { int HolyThread::StartThread() { int ret = handle_->CreateWorkerSpecificData(&private_data_); - if (ret) { + if (ret) { return ret; } return ServerThread::StartThread(); @@ -81,7 +81,7 @@ int HolyThread::StartThread() { int HolyThread::StopThread() { if (private_data_) { int ret = handle_->DeleteWorkerSpecificData(private_data_); - if (ret) { + if (ret) { return ret; } private_data_ = nullptr; @@ -118,7 +118,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) { } if (async_) { - if ((pfe->mask & kReadable) != 0) { + if (pfe->mask & kReadable) { ReadStatus read_status = in_conn->GetRequest(); struct timeval now; gettimeofday(&now, nullptr); @@ -132,7 +132,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) { should_close = 1; } } - if (((pfe->mask & kWritable) != 0) && in_conn->is_reply()) { + if ((pfe->mask & kWritable) && in_conn->is_reply()) { WriteStatus write_status = in_conn->SendReply(); if (write_status == kWriteAll) { in_conn->set_is_reply(false); @@ -144,7 +144,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) { } } } else { - if ((pfe->mask & kReadable) != 0) { + if (pfe->mask & kReadable) { ReadStatus getRes = in_conn->GetRequest(); struct timeval now; gettimeofday(&now, nullptr); @@ -158,7 +158,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) { return; } } - if ((pfe->mask & kWritable) != 0) { + if (pfe->mask & kWritable) { WriteStatus write_status = in_conn->SendReply(); if (write_status == kWriteAll) { in_conn->set_is_reply(false); @@ -170,7 +170,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) { } } } - if (((pfe->mask & kErrorEvent) != 0) || (should_close != 0)) { + if ((pfe->mask & kErrorEvent) || should_close) { net_multiplexer_->NetDelEvent(pfe->fd, 0); CloseFd(in_conn); in_conn = nullptr; @@ -192,7 +192,7 @@ void HolyThread::DoCronTask() { // Check whether close all connection std::lock_guard kl(killer_mutex_); - if (deleting_conn_ipport_.count(kKillAllConnsTask) != 0U) { + if (deleting_conn_ipport_.count(kKillAllConnsTask)) { for (auto& conn : conns_) { to_close.push_back(conn.second); } @@ -208,7 +208,7 @@ void HolyThread::DoCronTask() { while (iter != conns_.end()) { std::shared_ptr conn = iter->second; // Check connection should be closed - if (deleting_conn_ipport_.count(conn->ip_port()) != 0U) { + if (deleting_conn_ipport_.count(conn->ip_port())) { to_close.push_back(conn); deleting_conn_ipport_.erase(conn->ip_port()); iter = conns_.erase(iter); @@ -277,7 +277,7 @@ bool HolyThread::KillConn(const std::string& ip_port) { } void HolyThread::ProcessNotifyEvents(const net::NetFiredEvent* pfe) { - if ((pfe->mask & kReadable) != 0) { + if (pfe->mask & kReadable) { char bb[2048]; int32_t nread = read(net_multiplexer_->NotifyReceiveFd(), bb, 2048); if (nread == 0) { diff --git a/src/net/src/http_conn.cc b/src/net/src/http_conn.cc index 075fef1f37..e485a65a7c 100644 --- a/src/net/src/http_conn.cc +++ b/src/net/src/http_conn.cc @@ -123,7 +123,7 @@ bool HTTPRequest::ParseHeadLine(const char* data, int line_start, int line_end) bool HTTPRequest::ParseGetUrl() { path_ = url_; // Format path - if ((headers_.count("host")) && path_.find(headers_["host"]) != std::string::npos && + if (headers_.count("host") && path_.find(headers_["host"]) != std::string::npos && path_.size() > (7 + headers_["host"].size())) { // http://www.xxx.xxx/path_/to path_.assign(path_.substr(7 + headers_["host"].size())); @@ -206,7 +206,7 @@ int HTTPRequest::ParseHeader() { content_type_.assign(headers_.at("content-type")); } - if ((headers_.count("expect")) && + if (headers_.count("expect") && (headers_.at("expect") == "100-Continue" || headers_.at("expect") == "100-continue")) { reply_100continue_ = true; } @@ -498,7 +498,7 @@ void HTTPResponse::SetHeaders(const std::string& key, const size_t value) { head void HTTPResponse::SetContentLength(uint64_t size) { remain_send_len_ = size; - if ((headers_.count("Content-Length")) || (headers_.count("content-length"))) { + if (headers_.count("Content-Length") || (headers_.count("content-length"))) { return; } SetHeaders("Content-Length", size); diff --git a/src/pika.cc b/src/pika.cc index 9314d38e42..d879d001a3 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -64,7 +64,7 @@ static void PikaGlogInit() { } static void daemonize() { - if (fork() != 0) { + if (fork()) { exit(0); /* parent exits */ } setsid(); /* create a new session */ diff --git a/src/pika_admin.cc b/src/pika_admin.cc index f33bdda1f3..263094432a 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -590,8 +590,7 @@ void ClientCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "No such client"); } - - } +} void ShutdownCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -1985,12 +1984,9 @@ void MonitorCmd::Do(std::shared_ptr partition) { LOG(WARNING) << name_ << " weak ptr is empty"; return; } - std::shared_ptr conn = - std::dynamic_pointer_cast(conn_repl)->server_thread()->MoveConnOut(conn_repl->fd()); - assert(conn.get() == conn_repl.get()); - g_pika_server->AddMonitorClient(std::dynamic_pointer_cast(conn)); - g_pika_server->AddMonitorMessage("OK"); -// Monitor thread will return "OK" + + g_pika_server->AddMonitorClient(std::dynamic_pointer_cast(conn_repl)); + res_.SetRes(CmdRes::kOk); } void DbsizeCmd::DoInitial() { @@ -2177,7 +2173,7 @@ void SlowlogCmd::Do(std::shared_ptr partition) { } } } - } +} void PaddingCmd::DoInitial() { if (!CheckArg(argv_.size())) { diff --git a/src/pika_bit.cc b/src/pika_bit.cc index ce45783057..dad664ee3a 100644 --- a/src/pika_bit.cc +++ b/src/pika_bit.cc @@ -36,7 +36,7 @@ void BitSetCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidBitInt); return; } - } +} void BitSetCmd::Do(std::shared_ptr partition) { std::string value; @@ -151,7 +151,7 @@ void BitPosCmd::DoInitial() { } } else { res_.SetRes(CmdRes::kSyntaxErr, kCmdNameBitPos); -} + } } void BitPosCmd::Do(std::shared_ptr partition) { diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 34315c2f02..690b8bb66f 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -39,7 +39,7 @@ std::shared_ptr PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st tmp_ptr->res().SetRes(CmdRes::kErrOther, "unknown command \"" + opt + "\""); return tmp_ptr; } - c_ptr->SetConn(std::dynamic_pointer_cast(shared_from_this())); + c_ptr->SetConn(shared_from_this()); c_ptr->SetResp(resp_ptr); // Check authed diff --git a/src/pika_conf.cc b/src/pika_conf.cc index a81272509e..2ab5d65c33 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -135,7 +135,7 @@ Status PikaConf::DelTableSanityCheck(const std::string& table_name) { int PikaConf::Load() { int ret = LoadConf(); - if (ret) { + if (ret) { return ret; } diff --git a/src/pika_geo.cc b/src/pika_geo.cc index d107ffa119..2cb3c0ff91 100644 --- a/src/pika_geo.cc +++ b/src/pika_geo.cc @@ -40,7 +40,7 @@ void GeoAddCmd::DoInitial() { point.latitude = latitude; pos_.push_back(point); } - } +} void GeoAddCmd::Do(std::shared_ptr partition) { std::vector score_members; diff --git a/src/pika_hash.cc b/src/pika_hash.cc index c192d0a1c1..42f9fc74c0 100644 --- a/src/pika_hash.cc +++ b/src/pika_hash.cc @@ -31,7 +31,7 @@ void HDelCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HSetCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -51,7 +51,7 @@ void HSetCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HGetCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -121,7 +121,7 @@ void HGetallCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HExistsCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -154,7 +154,7 @@ void HIncrbyCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} void HIncrbyCmd::Do(std::shared_ptr partition) { int64_t new_value; @@ -168,7 +168,7 @@ void HIncrbyCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HIncrbyfloatCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -193,7 +193,7 @@ void HIncrbyfloatCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HKeysCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -214,7 +214,7 @@ void HKeysCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HLenCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -232,7 +232,7 @@ void HLenCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "something wrong in hlen"); } - } +} void HMgetCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -262,7 +262,7 @@ void HMgetCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HMsetCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -280,7 +280,7 @@ void HMsetCmd::DoInitial() { for (; index < argc; index += 2) { fvs_.push_back({argv_[index], argv_[index + 1]}); } - } +} void HMsetCmd::Do(std::shared_ptr partition) { rocksdb::Status s = partition->db()->HMSet(key_, fvs_); @@ -289,7 +289,7 @@ void HMsetCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HSetnxCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -328,7 +328,7 @@ void HStrlenCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "something wrong in hstrlen"); } - } +} void HValsCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -350,7 +350,7 @@ void HValsCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HScanCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -389,7 +389,7 @@ void HScanCmd::DoInitial() { res_.SetRes(CmdRes::kSyntaxErr); return; } - } +} void HScanCmd::Do(std::shared_ptr partition) { int64_t next_cursor = 0; @@ -411,7 +411,7 @@ void HScanCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void HScanxCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -447,7 +447,7 @@ void HScanxCmd::DoInitial() { res_.SetRes(CmdRes::kSyntaxErr); return; } - } +} void HScanxCmd::Do(std::shared_ptr partition) { std::string next_field; @@ -467,7 +467,7 @@ void HScanxCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void PKHScanRangeCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -500,7 +500,7 @@ void PKHScanRangeCmd::DoInitial() { } index++; } - } +} void PKHScanRangeCmd::Do(std::shared_ptr partition) { std::string next_field; @@ -520,7 +520,7 @@ void PKHScanRangeCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void PKHRScanRangeCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -553,7 +553,7 @@ void PKHRScanRangeCmd::DoInitial() { } index++; } - } +} void PKHRScanRangeCmd::Do(std::shared_ptr partition) { std::string next_field; @@ -573,4 +573,4 @@ void PKHRScanRangeCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} diff --git a/src/pika_kv.cc b/src/pika_kv.cc index dc0d665315..111d2077ff 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -49,9 +49,6 @@ void SetCmd::DoInitial() { if (pstd::string2int(argv_[index].data(), argv_[index].size(), &sec_) == 0) { res_.SetRes(CmdRes::kInvalidInt); return; - } else if (sec_ <= 0) { - res_.SetRes(CmdRes::kErrOther, "invalid expire time in set"); - return; } if (strcasecmp(opt.data(), "px") == 0) { @@ -63,7 +60,7 @@ void SetCmd::DoInitial() { } index++; } - } +} void SetCmd::Do(std::shared_ptr partition) { rocksdb::Status s; @@ -170,7 +167,7 @@ void DelCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "delete error"); } - } +} void DelCmd::Split(std::shared_ptr partition, const HintKeys& hint_keys) { std::map type_status; @@ -180,7 +177,7 @@ void DelCmd::Split(std::shared_ptr partition, const HintKeys& hint_ke } else { res_.SetRes(CmdRes::kErrOther, "delete error"); } - } +} void DelCmd::Merge() { res_.AppendInteger(split_res_); @@ -205,7 +202,7 @@ void IncrCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void IncrbyCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -217,7 +214,7 @@ void IncrbyCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt, kCmdNameIncrby); return; } - } +} void IncrbyCmd::Do(std::shared_ptr partition) { rocksdb::Status s = partition->db()->Incrby(key_, by_, &new_value_); @@ -230,7 +227,7 @@ void IncrbyCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void IncrbyfloatCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -243,7 +240,7 @@ void IncrbyfloatCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidFloat); return; } - } +} void IncrbyfloatCmd::Do(std::shared_ptr partition) { rocksdb::Status s = partition->db()->Incrbyfloat(key_, value_, &new_value_); @@ -257,7 +254,7 @@ void IncrbyfloatCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void DecrCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -278,7 +275,7 @@ void DecrCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void DecrbyCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -290,7 +287,7 @@ void DecrbyCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} void DecrbyCmd::Do(std::shared_ptr partition) { rocksdb::Status s = partition->db()->Decrby(key_, by_, &new_value_); @@ -303,7 +300,7 @@ void DecrbyCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void GetsetCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -327,7 +324,7 @@ void GetsetCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void AppendCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -346,7 +343,7 @@ void AppendCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void MgetCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -374,7 +371,7 @@ void MgetCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void MgetCmd::Split(std::shared_ptr partition, const HintKeys& hint_keys) { std::vector vss; @@ -391,7 +388,7 @@ void MgetCmd::Split(std::shared_ptr partition, const HintKeys& hint_k } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void MgetCmd::Merge() { res_.AppendArrayLen(split_res_.size()); @@ -429,7 +426,7 @@ void KeysCmd::DoInitial() { } else if (argv_.size() > 3) { res_.SetRes(CmdRes::kSyntaxErr); } - } +} void KeysCmd::Do(std::shared_ptr partition) { int64_t total_key = 0; @@ -472,7 +469,7 @@ void SetnxCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} std::string SetnxCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, uint64_t offset) { @@ -626,7 +623,7 @@ void MsetCmd::DoInitial() { for (size_t index = 1; index != argc; index += 2) { kvs_.push_back({argv_[index], argv_[index + 1]}); } - } + } void MsetCmd::Do(std::shared_ptr partition) { storage::Status s = partition->db()->MSet(kvs_); @@ -677,7 +674,7 @@ void MsetnxCmd::DoInitial() { for (size_t index = 1; index != argc; index += 2) { kvs_.push_back({argv_[index], argv_[index + 1]}); } - } +} void MsetnxCmd::Do(std::shared_ptr partition) { success_ = 0; @@ -703,7 +700,7 @@ void GetrangeCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} void GetrangeCmd::Do(std::shared_ptr partition) { std::string substr; @@ -737,7 +734,7 @@ void SetrangeCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void StrlenCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -755,7 +752,7 @@ void StrlenCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void ExistsCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -774,7 +771,7 @@ void ExistsCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "exists internal error"); } - } +} void ExistsCmd::Split(std::shared_ptr partition, const HintKeys& hint_keys) { std::map type_status; @@ -800,7 +797,7 @@ void ExpireCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} void ExpireCmd::Do(std::shared_ptr partition) { std::map type_status; @@ -810,7 +807,7 @@ void ExpireCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "expire internal error"); } - } +} std::string ExpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, uint64_t offset) { @@ -847,7 +844,7 @@ void PexpireCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} void PexpireCmd::Do(std::shared_ptr partition) { std::map type_status; @@ -857,7 +854,7 @@ void PexpireCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "expire internal error"); } - } +} std::string PexpireCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, uint64_t offset) { @@ -894,7 +891,7 @@ void ExpireatCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} void ExpireatCmd::Do(std::shared_ptr partition) { std::map type_status; @@ -916,7 +913,7 @@ void PexpireatCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} std::string PexpireatCmd::ToBinlog(uint32_t exec_time, uint32_t term_id, uint64_t logic_id, uint32_t filenum, uint64_t offset) { @@ -951,7 +948,7 @@ void PexpireatCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "pexpireat internal error"); } - } +} void TtlCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -986,7 +983,7 @@ void TtlCmd::Do(std::shared_ptr partition) { // mean this key not exist res_.AppendInteger(-2); } - } +} void PttlCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -1041,7 +1038,7 @@ void PttlCmd::Do(std::shared_ptr partition) { // mean this key not exist res_.AppendInteger(-2); } - } +} void PersistCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -1059,7 +1056,7 @@ void PersistCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "persist internal error"); } - } +} void TypeCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -1077,7 +1074,7 @@ void TypeCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void ScanCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -1111,7 +1108,7 @@ void ScanCmd::DoInitial() { } index++; } - } +} void ScanCmd::Do(std::shared_ptr partition) { int64_t total_key = 0; @@ -1225,7 +1222,7 @@ void PKSetexAtCmd::DoInitial() { res_.SetRes(CmdRes::kInvalidInt); return; } - } +} void PKSetexAtCmd::Do(std::shared_ptr partition) { rocksdb::Status s = partition->db()->PKSetexAt(key_, value_, time_stamp_); @@ -1234,7 +1231,7 @@ void PKSetexAtCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void PKScanRangeCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -1288,7 +1285,7 @@ void PKScanRangeCmd::DoInitial() { } index++; } - } +} void PKScanRangeCmd::Do(std::shared_ptr partition) { std::string next_key; @@ -1319,7 +1316,7 @@ void PKScanRangeCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void PKRScanRangeCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -1373,7 +1370,7 @@ void PKRScanRangeCmd::DoInitial() { } index++; } - } +} void PKRScanRangeCmd::Do(std::shared_ptr partition) { std::string next_key; @@ -1404,4 +1401,4 @@ void PKRScanRangeCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} diff --git a/src/pika_rsync_service.cc b/src/pika_rsync_service.cc index 780d782d12..5f1d8c0e6b 100644 --- a/src/pika_rsync_service.cc +++ b/src/pika_rsync_service.cc @@ -43,12 +43,12 @@ int PikaRsyncService::StartRsync() { auth = g_pika_conf->masterauth(); } ret = pstd::StartRsync(raw_path_, kDBSyncModule, "0.0.0.0", port_, auth); - if (ret) { + if (ret) { LOG(WARNING) << "Failed to start rsync, path:" << raw_path_ << " error : " << ret; return -1; } ret = CreateSecretFile(); - if (ret) { + if (ret) { LOG(WARNING) << "Failed to create secret file"; return -1; } diff --git a/src/pika_server.cc b/src/pika_server.cc index 1eb19bfa2e..c0351e3511 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -71,7 +71,6 @@ PikaServer::PikaServer() LOG(INFO) << "Worker queue limit is " << worker_queue_limit; pika_dispatch_thread_ = std::make_unique(ips, port_, worker_num_, 3000, worker_queue_limit, g_pika_conf->max_conn_rbuf_size()); - pika_monitor_thread_ = std::make_unique(); pika_rsync_service_ = std::make_unique(g_pika_conf->db_sync_path(), g_pika_conf->port() + kPortShiftRSync); pika_pubsub_thread_ = std::make_unique(); pika_auxiliary_thread_ = std::make_unique(); @@ -931,7 +930,7 @@ void PikaServer::DbSyncSendFile(const std::string& ip, int port, const std::stri int ret = 0; LOG(INFO) << "Partition: " << partition->GetPartitionName() << " Start Send files in " << bg_path << " to " << ip; ret = pstd::GetChildren(bg_path, descendant); - if (ret) { + if (ret) { std::string ip_port = pstd::IpPortString(ip, port); std::lock_guard ldb(db_sync_protector_); db_sync_slaves_.erase(ip_port); @@ -1008,7 +1007,7 @@ void PikaServer::DbSyncSendFile(const std::string& ip, int port, const std::stri } ret = pstd::RsyncSendFile(fn, remote_path + "/" + kBgsaveInfoFile, secret_file_path, remote); pstd::DeleteFile(fn); - if (ret) { + if (ret) { LOG(WARNING) << "Partition: " << partition->GetPartitionName() << " Send Modified Info File Failed"; } } else if (0 != (ret = pstd::RsyncSendFile(bg_path + "/" + kBgsaveInfoFile, remote_path + "/" + kBgsaveInfoFile, @@ -1040,13 +1039,10 @@ void PikaServer::KeyScanTaskSchedule(net::TaskFunc func, void* arg) { key_scan_thread_.Schedule(func, arg); } -void PikaServer::ClientKillAll() { - pika_dispatch_thread_->ClientKillAll(); - pika_monitor_thread_->ThreadClientKill(); -} +void PikaServer::ClientKillAll() { pika_dispatch_thread_->ClientKillAll(); } int PikaServer::ClientKill(const std::string& ip_port) { - if (pika_dispatch_thread_->ClientKill(ip_port) || pika_monitor_thread_->ThreadClientKill(ip_port)) { + if (pika_dispatch_thread_->ClientKill(ip_port)) { return 1; } return 0; @@ -1055,18 +1051,44 @@ int PikaServer::ClientKill(const std::string& ip_port) { int64_t PikaServer::ClientList(std::vector* clients) { int64_t clients_num = 0; clients_num += pika_dispatch_thread_->ThreadClientList(clients); - clients_num += pika_monitor_thread_->ThreadClientList(clients); return clients_num; } -bool PikaServer::HasMonitorClients() { return pika_monitor_thread_->HasMonitorClients(); } +bool PikaServer::HasMonitorClients() const { + std::unique_lock lock(monitor_mutex_protector_); + return !pika_monitor_clients_.empty(); +} void PikaServer::AddMonitorMessage(const std::string& monitor_message) { - pika_monitor_thread_->AddMonitorMessage(monitor_message); + const std::string msg = "+" + monitor_message + "\r\n"; + + std::vector> clients; + + std::unique_lock lock(monitor_mutex_protector_); + clients.reserve(pika_monitor_clients_.size()); + for (auto it = pika_monitor_clients_.begin(); it != pika_monitor_clients_.end();) { + auto cli = (*it).lock(); + if (cli) { + clients.push_back(std::move(cli)); + ++it; + } else { + it = pika_monitor_clients_.erase(it); + } + } + + lock.unlock(); // SendReply without lock + + for (const auto& cli : clients) { + cli->WriteResp(msg); + cli->SendReply(); + } } -void PikaServer::AddMonitorClient(const std::shared_ptr &client_ptr) { - pika_monitor_thread_->AddMonitorClient(client_ptr); +void PikaServer::AddMonitorClient(std::shared_ptr client_ptr) { + if (client_ptr) { + std::unique_lock lock(monitor_mutex_protector_); + pika_monitor_clients_.insert(client_ptr); + } } void PikaServer::SlowlogTrim() { diff --git a/src/pika_set.cc b/src/pika_set.cc index 53401d8f7d..ec2d2b9b02 100644 --- a/src/pika_set.cc +++ b/src/pika_set.cc @@ -51,8 +51,7 @@ void SPopCmd::DoInitial() { return; } } - - } +} void SPopCmd::Do(std::shared_ptr partition) { std::vector members; @@ -68,7 +67,7 @@ void SPopCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void SCardCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -86,7 +85,7 @@ void SCardCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, "scard error"); } - } +} void SMembersCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -108,7 +107,7 @@ void SMembersCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void SScanCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -146,7 +145,7 @@ void SScanCmd::DoInitial() { res_.SetRes(CmdRes::kSyntaxErr); return; } - } +} void SScanCmd::Do(std::shared_ptr partition) { int64_t next_cursor = 0; @@ -203,7 +202,7 @@ void SUnionCmd::Do(std::shared_ptr partition) { res_.AppendStringLen(member.size()); res_.AppendContent(member); } - } +} void SUnionstoreCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -224,7 +223,7 @@ void SUnionstoreCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void SInterCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -243,7 +242,7 @@ void SInterCmd::Do(std::shared_ptr partition) { res_.AppendStringLen(member.size()); res_.AppendContent(member); } - } +} void SInterstoreCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -264,7 +263,7 @@ void SInterstoreCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void SIsmemberCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -302,7 +301,7 @@ void SDiffCmd::Do(std::shared_ptr partition) { res_.AppendStringLen(member.size()); res_.AppendContent(member); } - } +} void SDiffstoreCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -343,7 +342,7 @@ void SMoveCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} void SRandmemberCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -362,7 +361,7 @@ void SRandmemberCmd::DoInitial() { ; } } - } +} void SRandmemberCmd::Do(std::shared_ptr partition) { std::vector members; @@ -381,4 +380,4 @@ void SRandmemberCmd::Do(std::shared_ptr partition) { } else { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - } +} diff --git a/src/pstd/src/posix.cc b/src/pstd/src/posix.cc index e4ee345c0c..390c33a182 100644 --- a/src/pstd/src/posix.cc +++ b/src/pstd/src/posix.cc @@ -69,7 +69,7 @@ void Setpgid(pid_t pid, pid_t pgid) { if ((rc = setpgid(pid, pgid)) < 0) { LOG(ERROR) << "Setpgid error: " << strerror(errno); } - } +} pid_t Getpgrp() { return getpgrp(); } @@ -97,31 +97,31 @@ void Sigprocmask(int how, const sigset_t* set, sigset_t* oldset) { if (sigprocmask(how, set, oldset) < 0) { LOG(ERROR) << "Sigprocmask error: " << strerror(errno); } - } +} void Sigemptyset(sigset_t* set) { if (sigemptyset(set) < 0) { LOG(ERROR) << "Sigemptyset error: " << strerror(errno); } - } +} void Sigfillset(sigset_t* set) { if (sigfillset(set) < 0) { LOG(ERROR) << "Sigfillset error: " << strerror(errno); } - } +} void Sigaddset(sigset_t* set, int signum) { if (sigaddset(set, signum) < 0) { LOG(ERROR) << "Sigaddset error: " << strerror(errno); } - } +} void Sigdelset(sigset_t* set, int signum) { if (sigdelset(set, signum) < 0) { LOG(ERROR) << "Sigdelset error: " << strerror(errno); } - } +} int Sigismember(const sigset_t* set, int signum) { int rc; diff --git a/src/storage/src/db_checkpoint.cc b/src/storage/src/db_checkpoint.cc index 4a21ad709d..0490d62a41 100644 --- a/src/storage/src/db_checkpoint.cc +++ b/src/storage/src/db_checkpoint.cc @@ -112,7 +112,7 @@ Status DBCheckpointImpl::CreateCheckpointWithFiles(const std::string& checkpoint // make wal_dir valid in that case std::string wal_dir = db_->GetOptions().wal_dir; if (wal_dir.empty()) { - wal_dir = db_->GetName() + "/"; + wal_dir = db_->GetOptions().db_paths[0].path; } size_t wal_size = live_wal_files.size(); diff --git a/src/storage/src/redis.cc b/src/storage/src/redis.cc index a96f558d7f..3691293912 100644 --- a/src/storage/src/redis.cc +++ b/src/storage/src/redis.cc @@ -80,8 +80,9 @@ Status Redis::SetOptions(const OptionType& option_type, const std::unordered_map Status s; for (auto handle : handles_) { s = db_->SetOptions(handle, options); - if (!s.ok()) { break; -} + if (!s.ok()) { + break; + } } return s; } diff --git a/src/storage/src/redis_strings.h b/src/storage/src/redis_strings.h index 898f1f03fb..a6d9143c1d 100644 --- a/src/storage/src/redis_strings.h +++ b/src/storage/src/redis_strings.h @@ -44,7 +44,7 @@ class RedisStrings : public Redis { Status MSetnx(const std::vector& kvs, int32_t* ret); Status Set(const Slice& key, const Slice& value); Status Setxx(const Slice& key, const Slice& value, int32_t* ret, int32_t ttl = 0); - Status SetBit(const Slice& key, int64_t offset, int32_t on, int32_t* ret); + Status SetBit(const Slice& key, int64_t offset, int32_t value, int32_t* ret); Status Setex(const Slice& key, const Slice& value, int32_t ttl); Status Setnx(const Slice& key, const Slice& value, int32_t* ret, int32_t ttl = 0); Status Setvx(const Slice& key, const Slice& value, const Slice& new_value, int32_t* ret, int32_t ttl = 0);