-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature: sync all snapshot data in Pika process without rsync subprocess #1805
Changes from all commits
73dc198
4b92d87
c93f5d6
8ba038f
dd51cde
4bbd2e6
d23e398
7785b42
e4aa948
26d4db8
547530c
7bac336
b2f4091
1b4b979
fa96b9d
84e69da
3fe94e4
0d35c3b
1130130
612f5e4
63c4a7e
dc86779
002d34d
1918b90
9583007
852b8d6
f39a024
ce861f7
bbf6585
17217ac
44c57ed
5983379
fe25080
6fa3717
f48726b
1e7750b
c5936b2
afd133a
4a54423
bbd489c
17095de
2090367
2be71f4
01b6a22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,7 +34,8 @@ class PikaServer; | |
/* Port shift */ | ||
const int kPortShiftRSync = 1000; | ||
const int kPortShiftReplServer = 2000; | ||
|
||
//TODO: Temporarily used for rsync server port shift. will be deleted. | ||
const int kPortShiftRsync2 = 10001; | ||
const std::string kPikaPidFile = "pika.pid"; | ||
const std::string kPikaSecretFile = "rsync.secret"; | ||
const std::string kDefaultRsyncAuth = "default"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the code patch you provided, here are some observations:
Overall, without further context or the complete code, it is difficult to identify any potential bugs or suggest additional improvements beyond what has been mentioned. A comprehensive code review requires examining the broader codebase and understanding the overall architecture and requirements. |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,152 @@ | ||
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. | ||
// This source code is licensed under the BSD-style license found in the | ||
// LICENSE file in the root directory of this source tree. An additional grant | ||
// of patent rights can be found in the PATENTS file in the same directory. | ||
|
||
#ifndef RSYNC_CLIENT_H_ | ||
#define RSYNC_CLIENT_H_ | ||
wangshao1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#include <glog/logging.h> | ||
#include <fcntl.h> | ||
#include <sys/stat.h> | ||
#include <sys/types.h> | ||
#include <list> | ||
#include <atomic> | ||
#include <memory> | ||
#include <thread> | ||
#include <condition_variable> | ||
|
||
#include "net/include/bg_thread.h" | ||
#include "net/include/net_cli.h" | ||
#include "pstd/include/env.h" | ||
#include "pstd/include/pstd_status.h" | ||
#include "pstd/include/pstd_hash.h" | ||
#include "pstd/include/pstd_string.h" | ||
#include "pstd/include/pstd_status.h" | ||
#include "include/rsync_client_thread.h" | ||
#include "include/throttle.h" | ||
#include "rsync_service.pb.h" | ||
|
||
const std::string kDumpMetaFileName = "DUMP_META_DATA"; | ||
const std::string kUuidPrefix = "snapshot-uuid:"; | ||
|
||
namespace rsync { | ||
|
||
class RsyncWriter; | ||
class Session; | ||
class WaitObject; | ||
|
||
class RsyncClient : public net::Thread { | ||
public: | ||
enum State { | ||
IDLE, | ||
RUNNING, | ||
STOP, | ||
}; | ||
RsyncClient(const std::string& dir, const std::string& db_name, const uint32_t slot_id); | ||
void* ThreadMain() override; | ||
bool Init(); | ||
Status Start(); | ||
Status Stop(); | ||
bool IsRunning() { | ||
return state_.load() == RUNNING; | ||
} | ||
bool IsIdle() { return state_.load() == IDLE;} | ||
void OnReceive(RsyncService::RsyncResponse* resp); | ||
|
||
private: | ||
bool Recover(); | ||
Status Wait(RsyncService::RsyncResponse*& resp); | ||
Status CopyRemoteFile(const std::string& filename); | ||
Status CopyRemoteMeta(std::string* snapshot_uuid, std::set<std::string>* file_set); | ||
Status LoadLocalMeta(std::string* snapshot_uuid, std::map<std::string, std::string>* file_map); | ||
std::string GetLocalMetaFilePath(); | ||
AlexStocks marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Status FlushMetaTable(); | ||
Status CleanUpExpiredFiles(bool need_reset_path, const std::set<std::string>& files); | ||
Status UpdateLocalMeta(const std::string& snapshot_uuid, const std::set<std::string>& expired_files, std::map<std::string, std::string>* localFileMap); | ||
void HandleRsyncMetaResponse(RsyncService::RsyncResponse* response); | ||
|
||
private: | ||
std::map<std::string, std::string> meta_table_; | ||
int flush_period_ = 10; | ||
std::set<std::string> file_set_; | ||
std::string snapshot_uuid_; | ||
std::string dir_; | ||
std::string db_name_; | ||
uint32_t slot_id_ = 0; | ||
std::unique_ptr<RsyncClientThread> client_thread_; | ||
std::atomic<State> state_; | ||
int max_retries_ = 10; | ||
std::unique_ptr<WaitObject> wo_; | ||
std::condition_variable cond_; | ||
std::mutex mu_; | ||
std::unique_ptr<Throttle> throttle_; | ||
std::string master_ip_; | ||
int master_port_; | ||
}; | ||
|
||
class RsyncWriter { | ||
public: | ||
RsyncWriter(const std::string& filepath) { | ||
filepath_ = filepath; | ||
fd_ = open(filepath.c_str(), O_RDWR | O_APPEND | O_CREAT, 0644); | ||
} | ||
~RsyncWriter() {} | ||
Status Write(uint64_t offset, size_t n, const char* data) { | ||
const char* ptr = data; | ||
size_t left = n; | ||
Status s; | ||
while (left != 0) { | ||
ssize_t done = write(fd_, ptr, left); | ||
if (done < 0) { | ||
if (errno == EINTR) { | ||
continue; | ||
} | ||
LOG(WARNING) << "pwrite failed, filename: " << filepath_ << "errno: " << strerror(errno) << "n: " << n; | ||
return Status::IOError(filepath_, "pwrite failed"); | ||
} | ||
left -= done; | ||
ptr += done; | ||
offset += done; | ||
} | ||
return Status::OK(); | ||
} | ||
Status Close() { | ||
close(fd_); | ||
return Status::OK(); | ||
} | ||
Status Fsync() { | ||
fsync(fd_); | ||
return Status::OK(); | ||
} | ||
|
||
private: | ||
std::string filepath_; | ||
int fd_ = -1; | ||
}; | ||
|
||
class WaitObject { | ||
public: | ||
WaitObject() : filename_(""), type_(RsyncService::kRsyncMeta), offset_(0), resp_(nullptr) {} | ||
~WaitObject() {} | ||
void Reset(const std::string& filename, RsyncService::Type t, size_t offset) { | ||
resp_ = nullptr; | ||
filename_ = filename; | ||
type_ = t; | ||
offset_ = offset; | ||
} | ||
void Reset(RsyncService::Type t) { | ||
resp_ = nullptr; | ||
filename_ = ""; | ||
type_ = t; | ||
offset_ = 0xFFFFFFFF; | ||
} | ||
std::string filename_; | ||
RsyncService::Type type_; | ||
wangshao1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
size_t offset_ = 0xFFFFFFFF; | ||
RsyncService::RsyncResponse* resp_ = nullptr; | ||
}; | ||
|
||
} // end namespace rsync | ||
#endif | ||
wangshao1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. | ||
// This source code is licensed under the BSD-style license found in the | ||
// LICENSE file in the root directory of this source tree. An additional grant | ||
// of patent rights can be found in the PATENTS file in the same directory. | ||
|
||
#ifndef RSYNC_CLIENT_THREAD_H_ | ||
#define RSYNC_CLIENT_THREAD_H_ | ||
|
||
#include "net/include/client_thread.h" | ||
#include "net/include/net_conn.h" | ||
#include "net/include/pb_conn.h" | ||
#include "rsync_service.pb.h" | ||
|
||
using namespace pstd; | ||
using namespace net; | ||
|
||
namespace rsync { | ||
|
||
class RsyncClientConn : public PbConn { | ||
public: | ||
RsyncClientConn(int fd, const std::string& ip_port, | ||
net::Thread* thread, void* cb_handler, | ||
NetMultiplexer* mpx); | ||
~RsyncClientConn() override; | ||
int DealMessage() override; | ||
|
||
private: | ||
void* cb_handler_ = nullptr; | ||
}; | ||
|
||
class RsyncClientConnFactory : public ConnFactory { | ||
public: | ||
RsyncClientConnFactory(void* scheduler) : cb_handler_(scheduler) {} | ||
std::shared_ptr<net::NetConn> NewNetConn(int connfd, const std::string& ip_port, | ||
net::Thread* thread, void* cb_handler, | ||
net::NetMultiplexer* net) const override { | ||
return std::static_pointer_cast<net::NetConn>( | ||
std::make_shared<RsyncClientConn>(connfd, ip_port, thread, cb_handler_, net)); | ||
} | ||
private: | ||
void* cb_handler_ = nullptr; | ||
}; | ||
|
||
class RsyncClientThread : public ClientThread { | ||
public: | ||
RsyncClientThread(int cron_interval, int keepalive_timeout, void* scheduler); | ||
~RsyncClientThread() override; | ||
private: | ||
RsyncClientConnFactory conn_factory_; | ||
ClientHandle handle_; | ||
}; | ||
|
||
} //end namespace rsync | ||
#endif | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The provided code patch appears to be a header file related to a client thread for an rsync application. Here are some suggestions and areas to consider in your code review:
Remember that a comprehensive code review requires access to the complete codebase including related implementation files, so these suggestions may not cover all aspects of your project. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
// Copyright (c) 2023-present, Qihoo, Inc. All rights reserved. | ||
// This source code is licensed under the BSD-style license found in the | ||
// LICENSE file in the root directory of this source tree. An additional grant | ||
// of patent rights can be found in the PATENTS file in the same directory. | ||
|
||
#ifndef RSYNC_SERVER_H_ | ||
#define RSYNC_SERVER_H_ | ||
wangshao1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#include <stdio.h> | ||
#include <unistd.h> | ||
|
||
#include "net/include/net_conn.h" | ||
#include "net/include/net_thread.h" | ||
#include "net/include/pb_conn.h" | ||
#include "net/include/server_thread.h" | ||
#include "net/include/thread_pool.h" | ||
#include "net/src/holy_thread.h" | ||
#include "net/src/net_multiplexer.h" | ||
#include "pstd/include/env.h" | ||
#include "pstd_hash.h" | ||
#include "rsync_service.pb.h" | ||
|
||
namespace rsync { | ||
struct RsyncServerTaskArg { | ||
std::shared_ptr<RsyncService::RsyncRequest> req; | ||
std::shared_ptr<net::PbConn> conn; | ||
RsyncServerTaskArg(std::shared_ptr<RsyncService::RsyncRequest> _req, std::shared_ptr<net::PbConn> _conn) | ||
: req(std::move(_req)), conn(std::move(_conn)) {} | ||
}; | ||
class RsyncReader; | ||
class RsyncServerThread; | ||
|
||
class RsyncServer { | ||
public: | ||
RsyncServer(const std::set<std::string>& ips, const int port); | ||
~RsyncServer(); | ||
void Schedule(net::TaskFunc func, void* arg); | ||
int Start(); | ||
int Stop(); | ||
private: | ||
std::unique_ptr<net::ThreadPool> work_thread_; | ||
std::unique_ptr<RsyncServerThread> rsync_server_thread_; | ||
}; | ||
|
||
class RsyncServerConn : public net::PbConn { | ||
public: | ||
RsyncServerConn(int connfd, const std::string& ip_port, | ||
net::Thread* thread, void* worker_specific_data, | ||
net::NetMultiplexer* mpx); | ||
virtual ~RsyncServerConn() override; | ||
int DealMessage() override; | ||
static void HandleMetaRsyncRequest(void* arg); | ||
static void HandleFileRsyncRequest(void* arg); | ||
private: | ||
void* data_ = nullptr; | ||
}; | ||
|
||
class RsyncServerThread : public net::HolyThread { | ||
public: | ||
RsyncServerThread(const std::set<std::string>& ips, int port, int cron_internal, RsyncServer* arg); | ||
~RsyncServerThread(); | ||
|
||
private: | ||
class RsyncServerConnFactory : public net::ConnFactory { | ||
public: | ||
explicit RsyncServerConnFactory(RsyncServer* sched) : scheduler_(sched) {} | ||
|
||
std::shared_ptr<net::NetConn> NewNetConn(int connfd, const std::string& ip_port, | ||
net::Thread* thread, void* worker_specific_data, | ||
net::NetMultiplexer* net) const override { | ||
return std::static_pointer_cast<net::NetConn>( | ||
std::make_shared<RsyncServerConn>(connfd, ip_port, thread, scheduler_, net)); | ||
} | ||
private: | ||
RsyncServer* scheduler_ = nullptr; | ||
}; | ||
class RsyncServerHandle : public net::ServerHandle { | ||
public: | ||
void FdClosedHandle(int fd, const std::string& ip_port) const override; | ||
void FdTimeoutHandle(int fd, const std::string& ip_port) const override; | ||
bool AccessHandle(int fd, std::string& ip) const override; | ||
void CronHandle() const override; | ||
}; | ||
private: | ||
RsyncServerConnFactory conn_factory_; | ||
RsyncServerHandle handle_; | ||
}; | ||
|
||
} //end namespace rsync | ||
#endif | ||
wangshao1 marked this conversation as resolved.
Show resolved
Hide resolved
wangshao1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the provided code patch, here is a brief code review:
The addition of a new constant
kPortShiftRsync2
and the accompanying comment suggest that it is temporarily used for rsync server port shifting. Ensure that this temporary usage is properly communicated and documented to prevent confusion.Overall, the code change seems to be straightforward and doesn't introduce any obvious bug risks.
Improvement suggestions:
Consider using more descriptive names for constants, such as
kPortShiftRsyncTemporary
, instead ofkPortShiftRsync2
. This will make the code more readable and maintainable.Ensure that the use of this temporary variable
kPortShiftRsync2
is properly removed or updated once its temporary usage is complete, to avoid leaving unused or misleading code in the system.It's always good practice to include a comment explaining the reason behind certain code changes, especially when introducing temporary variables/constants or making non-obvious modifications that deviate from the existing codebase.
Remember to thoroughly test the code after making these changes to ensure proper functionality.