diff --git a/include/pika_rm.h b/include/pika_rm.h index 61821eab54..0d1c14755e 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -161,7 +161,7 @@ class SyncSlaveSlot : public SyncSlot { void ActivateRsync(); private: - std::unique_ptr sync_cli_; + std::unique_ptr rsync_cli_; pstd::Mutex slot_mu_; RmNode m_info_; ReplState repl_state_{kNoConnect}; diff --git a/include/rsync_client.h b/include/rsync_client.h index 3e636a60b9..57999717dd 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -17,6 +17,7 @@ #include "net/include/net_cli.h" #include "pstd/include/env.h" #include "pstd/include/pstd_hash.h" +#include "pstd/include/pstd_string.h" #include "pstd/include/pstd_status.h" #include "rsync_service.pb.h" #include "throttle.h" diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 16cb2d1ab5..d2663ee219 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -538,7 +538,9 @@ Status SyncMasterSlot::ConsensusReset(const LogOffset& applied_offset) { return /* SyncSlaveSlot */ SyncSlaveSlot::SyncSlaveSlot(const std::string& db_name, uint32_t slot_id) : SyncSlot(db_name, slot_id) { - rsync_cli_.reset(new rsync::RsyncClient(dbsync_path_, db_name, slot_id)); + //TODO: get dbsync_path from slot class + std::string dbsync_path = g_pika_conf->db_sync_path() + "/" + db_name; + rsync_cli_.reset(new rsync::RsyncClient(dbsync_path, db_name, slot_id)); m_info_.SetLastRecvTime(pstd::NowMicros()); } @@ -643,7 +645,7 @@ void SyncSlaveSlot::ActivateRsync() { return; } if (rsync_cli_->Init(local_ip_)) { - rsnyc_cli_->Start(); + rsync_cli_->Start(); } } @@ -1152,7 +1154,7 @@ Status PikaReplicaManager::RunSyncSlaveSlotStateMachine() { std::shared_ptr slot = g_pika_server->GetDBSlotById(p_info.db_name_, p_info.slot_id_); if (slot) { - slot->ActivateRsync(); + s_slot->ActivateRsync(); slot->TryUpdateMasterOffset(); } else { LOG(WARNING) << "Slot not found, DB Name: " << p_info.db_name_ diff --git a/src/rsync_client.cc b/src/rsync_client.cc index d7833f2304..c487372e53 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -10,13 +10,13 @@ using namespace RsyncService; using namespace pstd; namespace rsync { -RsyncClient::RsyncClient(const std::string& dir, const std::string& db_name, const size_t slot_id) - : dir_ (dir), flush_period_(100), db_name_(ip), slot_id_(port), state_(IDLE), - db_name_(""), slot_id_(0), max_retries_(10) { +RsyncClient::RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id) + : dir_ (dir), flush_period_(100), db_name_(db_name), slot_id_(slot_id), state_(IDLE), + max_retries_(10) { client_thread_ = std::make_unique(10 * 1000, 60 * 1000, this); } -bool RsyncClient::Init(const std::string ip_port) { +bool RsyncClient::Init(const std::string& ip_port) { if (!ParseIpPortString(ip_port, ip_, port_)) { LOG(WARNING) << "Parse ip_port error " << ip_port; return false;