Skip to content

Commit

Permalink
Merge pull request #1 from ppLorins/develop
Browse files Browse the repository at this point in the history
1. improve & finish benchmark related logics.
  • Loading branch information
ppLorins authored Nov 2, 2019
2 parents 8393690 + 35a47da commit 4f4264c
Show file tree
Hide file tree
Showing 70 changed files with 752 additions and 493 deletions.
32 changes: 22 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
BUILD_TYPE=debug

CXX = g++
CXXFLAGS = -std=c++17 -D_RAFT_UNIT_TEST_
CXXFLAGS = -std=c++17

ifeq ($(BUILD_TYPE),debug)
CXXFLAGS += -g3
Expand All @@ -13,9 +13,14 @@ endif
CXXFLAGS += -O3
endif

CXXFLAGS_UTEST = $(CXXFLAGS) -D_RAFT_UNIT_TEST_

SRCDIR = src
BINDIR = bin
OBJDIR = $(BINDIR)/$(BUILD_TYPE)/object
OBJ_EXE_SUB_DIR = $(OBJDIR)/release-version
OBJ_UTEST_SUB_DIR = $(OBJDIR)/unit-test-version


THIRD_PARTY_DIR=./third_party

Expand Down Expand Up @@ -61,7 +66,7 @@ all: system-check $(MAIN_PROGRAM) $(MAIN_TEST)

.PHONY: prepare
prepare:
mkdir -p $(OBJDIR)
mkdir -p $(OBJ_EXE_SUB_DIR) $(OBJ_UTEST_SUB_DIR)

ALL_SRC_FILES=$(wildcard src/*.cc src/*/*.cc)
TPL_CC_FILES=%src/tools/lock_free_deque.cc %src/tools/lock_free_hash.cc \
Expand All @@ -78,28 +83,35 @@ TPL_CC_FILES=%src/tools/lock_free_deque.cc %src/tools/lock_free_hash.cc \

COMPILE_SRC_FILES = $(filter-out $(TPL_CC_FILES), $(ALL_SRC_FILES) )

OBJ = $(patsubst %.cc, $(OBJDIR)/%.o, $(COMPILE_SRC_FILES))
OBJ_EXE = $(patsubst %.cc, $(OBJ_EXE_SUB_DIR)/%.o, $(COMPILE_SRC_FILES))
OBJ_UTEST = $(patsubst %.cc, $(OBJ_UTEST_SUB_DIR)/%.o, $(COMPILE_SRC_FILES))

EXE_MAIN_OBJ=%/main.o
UTEST_MAIN_OBJ=%gtest_main.o
EXE_OBJ = $(filter-out $(UTEST_MAIN_OBJ), $(OBJ) )
UTEST_OBJ = $(filter-out $(EXE_MAIN_OBJ), $(OBJ) )
UTEST_MAIN_OBJ=%/gtest_main.o

EXE_OBJ = $(filter-out $(UTEST_MAIN_OBJ), $(OBJ_EXE) )
UTEST_OBJ = $(filter-out $(EXE_MAIN_OBJ), $(OBJ_UTEST) )

.PHONY:test
test:$(PROTO_FLAG)
@echo "all:" $(ALL_SRC_FILES)
@echo "src:" $(COMPILE_SRC_FILES)
@echo "object:" $(OBJ)
@echo "object-exe:" $(OBJ_EXE)
@echo "object-utest:" $(OBJ_UTEST)

$(OBJDIR)/%.o: %.cc
@mkdir -p $(OBJDIR)/$(dir $<)
$(OBJ_EXE_SUB_DIR)/%.o: %.cc
@mkdir -p $(OBJ_EXE_SUB_DIR)/$(dir $<)
$(CXX) $(CXXFLAGS) $(INC) -c $< -o $@

$(OBJ_UTEST_SUB_DIR)/%.o: %.cc
@mkdir -p $(OBJ_UTEST_SUB_DIR)/$(dir $<)
$(CXX) $(CXXFLAGS_UTEST) $(INC) -c $< -o $@

$(MAIN_PROGRAM): $(EXE_OBJ)
$(CXX) $(CXXFLAGS) $^ $(LIB) -o $@

$(MAIN_TEST): $(UTEST_OBJ)
$(CXX) $(CXXFLAGS) $^ $(LIB) -o $@
$(CXX) $(CXXFLAGS_UTEST) $^ $(LIB) -o $@

PROTOC = protoc
GRPC_CPP_PLUGIN = grpc_cpp_plugin
Expand Down
36 changes: 27 additions & 9 deletions doc/benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This is the environment for benchmark.
### Machines:

|machine|OS|Compiler|Cpu|Memory|Disk|
|--|--|--|--|
|--|--|--|--|--|--|
|machineA|win10 64 pro|MSVC 19.00.24215.1 for x86|i7 8(4*2)core 4.0GHZ|16G|SSD|
|machineB|Mac 64 High Seria 10.13.6|Apple LLVM version 10.0.0 (clang-1000.10.44.4)|i7 8(4*2)core 2.2GHZ|16G|SSD|

Expand All @@ -20,16 +20,26 @@ Leader is deployed on machineA, all followers are deployed on machineB.
### Config:

#### leader config:
--do_heartbeat=false --iterating_wait_timeo_us=2000000 --port=10010 --leader_append_entries_rpc_timeo_ms=5000 --leader_commit_entries_rpc_timeo_ms=5000 --client_cq_num=2 --client_thread_num=2 --notify_cq_num=2 --notify_cq_threads=4 --call_cq_num=2 --call_cq_threads=2 --iterating_threads=2 --client_pool_size=50000
--do_heartbeat=false --iterating_wait_timeo_us=2000000 --port=10010 --leader_append_entries_rpc_timeo_ms=5000 --leader_commit_entries_rpc_timeo_ms=5000 --client_cq_num=2 --client_thread_num=2 --notify_cq_num=2 --notify_cq_threads=4 --call_cq_num=2 --call_cq_threads=2 --iterating_threads=2 --client_pool_size=400000

#### follower config:
--checking_heartbeat=false --iterating_wait_timeo_us=50000 --disorder_msg_timeo_ms=100000 --port=${port} --notify_cq_num=1 --notify_cq_threads=4 --call_cq_num=1 --call_cq_threads=4 --iterating_threads=2
--checking_heartbeat=false --iterating_wait_timeo_us=50000 --disorder_msg_timeo_ms=100000 --port=$\{port\} --notify_cq_num=1 --notify_cq_threads=4 --call_cq_num=1 --call_cq_threads=4 --iterating_threads=2

### Logic

Using the `TestLeaderServiceClient.Benchmark` test case in `src\gtest\service\test_leader_service.h` with arguments: --gtest_filter=TestLeaderServiceClient.Benchmark --client_write_timo_ms=10000 --benchmark_client_cq_num=1 --benchmark_client_thread_num_per_cq=1 --value_len=1 --leader_svc_benchmark_req_count=20000
Using the `TestLeaderServiceClient.Benchmark` test case in `src\gtest\service\test_leader_service.h` with arguments: --gtest_filter=TestLeaderServiceClient.Benchmark --client_write_timo_ms=5000 --benchmark_client_cq_num=1 --benchmark_client_polling_thread_num_per_cq=2 --leader_svc_benchmark_req_count=80000 --benchmark_client_entrusting_thread_num=1

Which is, using asynchronous way of sending 2w requests each time, counting overall write throughput & average write latency. Reading performance is not considered. The influential factors being tested are : **data length** and **number of followers**.
Which is, sending 4w requests in an asynchronous way, counting overall write throughput. (Reading performance is not considered here). The influential factors being tested are : **data length** and **number of followers**.

### About latency:
First of all, the latency for a specific reqeust comprises three parts:
* round trip time on the network.
* waiting to be processed on the serer side, especially under heavy loads. This is usually implemented with a queue that holding the requests.
* the business logic processing time, in raft, it usually contains of :
* appending logs.
* replicating requests to the majority of the cluster.

If we saturating the server, the time spending on the second part will drastically increasing, resulting in a high average latency but that number is meaningless since measuring latency containing `part2` cannot truly reflecting the real processing abilities of the server, what it can only tell about us is that : *Oh, man, the server have already doing its best now..*. And any kind of server will get a point like that when it reach it's processing limitations. So here when we're talkng about latency we only forcus on the time spending on `part1 + part3`.

### Result:

Expand All @@ -39,17 +49,25 @@ Which is, using asynchronous way of sending 2w requests each time, counting over

![tp-dl](../doc/images/benchmark-throughput-datalen.png)

![latency-dl](../doc/images/benchmark-latency-datalen.png)

#### Taking **number of followers** as the factor:

![F-factor](../doc/images/followers-factor.png)

![tp-#F](../doc/images/benchmark-throughput-followers.png)

![tp-#F](../doc/images/benchmark-latency-followers.png)
Latency result under windows-mac case is unreliable since the network is unstable.

> ##### Result under linux:
> * Throughput result under linux generally greater than the above result for ~50%-70%.
> * Latency result under linux is stable at 1ms~2ms for all the above cases.
### Bottleneck analysis:
If we remove all the logics leaving only the skeleton(where leader broadcasting requests to all its followers) left, we'll get a result of ~2w throughput & ~500ms latency. After a more detailed investigation, you'll find that it's `UnaryAsyncClient<T,R,Q,CQ>::EntrustRequest()::this->m_reader->Finish` that made the watershed, probably because it would trigger the broadcasting process to the followers which further bring a performance drawback. So the **bottleneck is on the grpc framework itself** under my experiments. Better practices for how to utilize grpc is needed.
First, let write an example to see the performance when there is no logic but only grpc framework.
* leader: receive requests from client and broadcasting them to its followers, here we got 2 followers.
* follower: ping-pong server, do nothing but return an empty msg upon receiving requests from the leader.

Leader's code is [here](https://gist.github.com/ppLorins/d72272b6f79c580c25a88a5bb3e489d0).

We'll get a result of **~2.0w/s** throughput under the same environment and deployment. So we can almost concude that the **bottleneck is on the grpc framework itself** according to this experiment. Better practices for how to utilize grpc is still hard to figure out due to grpc is not as good as you might imagine.


4 changes: 2 additions & 2 deletions doc/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* [Lockfree hash](#lockfree-hash)
* [Lockfree deque](#lockfree-deque)
* [Lockfree MPMC queue](#lockfree-mpmc-queue)
* [Lockfree priority queue](#lockfree-priority-queue)
* [Lockfree priority queue](#priorify-queue)
* [Basic workflow](#basic-workflow)
* [Follower workflow](#follower-workflow)
* [Leader workflow](#leader-workflow)
Expand Down Expand Up @@ -387,7 +387,7 @@ The basic idea of consuming is very like that of producing, and keep in mind tha

![deque-pop-1](images/deque-pop-1.png)

The overall process of consuming is trying to move `dummy->next` to its next one by one. Details will not be explained, reference the counterpart of producing, they are basically the same.
The overall process of consuming is trying to move `dummy->next` to its next one by one. Details will not be explained, consult to the counterpart of producing, they are basically the same.

##### 3. About deque's wait-free
This is the tricky part of `LockFreeDeque`: To achieve the [wait-free](https://en.wikipedia.org/wiki/Non-blocking_algorithm) semantic as much as possible, there is no boundary between the nodes being produced and the nodes being consumed. Therefore, a node can be immediately consumed so long as it emerged on the list regardless whether the `tail` pointer has passed over it or not, and a node can also be immediately produced once it became producible(empty), vice versa. This is an aggressive strategy, a double-edged sword, it pushes us more close to `wait-free` but also has its own downside: we cannot freeing a node just after it has been consumed since the node may still being used by other threads which are trying to move the `tail` cursor forward. The freeing operation has to be deferred: we first push the consumed node into a `garbage list`, a dedicated thread will polling periodically from it and the node be physically freed later.
Expand Down
Binary file modified doc/images/benchmark-throughput-datalen.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/images/benchmark-throughput-followers.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/images/datalen-factor.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc/images/followers-factor.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 0 additions & 10 deletions src/binlog/binlog_operator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,9 @@ void BinLogOperator::Initialize(const char* role, bool file_name) noexcept {

this->m_precede_lcl_inuse.store(0);

//this->m_term_changed.store(false);

this->m_initialized = true;
}

//void BinLogOperator::SetTermMatched() noexcept {
// //this->m_term_changed = true;
//}

//bool BinLogOperator::IsTermMatched() noexcept {
// //return this->m_term_changed;
//}

void BinLogOperator::AddPreLRLUseCount() noexcept {
this->m_precede_lcl_inuse.fetch_add(1);
}
Expand Down
6 changes: 0 additions & 6 deletions src/binlog/binlog_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,6 @@ class BinLogOperator final{

void GetOrderedMeta(FileMetaData::TypeOffsetList &_output) noexcept;

//bool IsTermMatched() noexcept;

//void SetTermMatched() noexcept;

void AddPreLRLUseCount() noexcept;

void SubPreLRLUseCount() noexcept;
Expand Down Expand Up @@ -135,8 +131,6 @@ class BinLogOperator final{

bool m_initialized = false;

//std::atomic<bool> m_term_changed = false;

std::atomic<LogIdentifier> m_last_logged;

std::string m_file_name = "";
Expand Down
5 changes: 3 additions & 2 deletions src/client/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ using ::RaftCore::Member::MemberMgr;
using ::RaftCore::Election::ElectionMgr;

AppendEntriesAsyncClient::AppendEntriesAsyncClient(std::shared_ptr<::grpc::Channel> shp_channel,
std::shared_ptr<::grpc::CompletionQueue> shp_cq)
std::shared_ptr<::grpc::CompletionQueue> shp_cq, bool delegate_me)
: UnaryAsyncClient<::raft::AppendEntriesRequest, ::raft::AppendEntriesResponse,
AppendEntriesAsyncClient>(shp_channel, shp_cq) {

//Give myself a long lived delegator.
this->OwnershipDelegator<AppendEntriesAsyncClient>::ResetOwnership(this);
if (delegate_me)
this->OwnershipDelegator<AppendEntriesAsyncClient>::ResetOwnership(this);
}

AppendEntriesAsyncClient::~AppendEntriesAsyncClient() {}
Expand Down
2 changes: 1 addition & 1 deletion src/client/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AppendEntriesAsyncClient : public UnaryAsyncClient<::raft::AppendEntriesRe
public:

AppendEntriesAsyncClient(std::shared_ptr<::grpc::Channel> shp_channel,
std::shared_ptr<::grpc::CompletionQueue> shp_cq);
std::shared_ptr<::grpc::CompletionQueue> shp_cq, bool delegate_me = true);

virtual ~AppendEntriesAsyncClient();

Expand Down
6 changes: 6 additions & 0 deletions src/common/react_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,11 @@ void ReactWorkGroup<T>::ShutDownCQ() noexcept {
this->m_shp_cq->Shutdown();
}

template<typename T>
void ReactWorkGroup<T>::GetThreadId(std::vector<std::thread::id> &ids) noexcept {
for (auto& _thread : this->m_vec_threads)
ids.emplace_back(_thread->get_id());
}

}

2 changes: 2 additions & 0 deletions src/common/react_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class ReactWorkGroup {

void ShutDownCQ() noexcept;

void GetThreadId(std::vector<std::thread::id> &ids) noexcept;

private:

void GrpcPollingThread() noexcept;
Expand Down
15 changes: 10 additions & 5 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ namespace RaftCore::Config {
DEFINE_uint32(notify_cq_threads, 4, "#threads polling on each notify CQ.");
DEFINE_uint32(call_cq_num, 2, "#call CQ the server use.");
DEFINE_uint32(call_cq_threads, 2, "#threads polling on each call CQ.");
DEFINE_uint32(client_cq_num, 2, "#completion queues dedicated for process backend RPCs in the leader.");
DEFINE_uint32(client_thread_num, 2, "#threads for each client CQ.");
DEFINE_uint32(client_thread_num, 2, "#threads polling on each client CQ.");

DEFINE_uint32(request_pool_size, 100, "#call data instance each thread hold.");

DEFINE_uint32(binlog_append_file_timeo_us, 1500, "append binlog cv wait timeout in microseconds .");
Expand Down Expand Up @@ -112,18 +112,23 @@ namespace RaftCore::Config {
DEFINE_bool(clear_existing_sstable_files, true, "whether delete all existing sstable files or not.");
DEFINE_uint32(hash_slot_num, 500, "#slots in lockfree hash.");
DEFINE_uint32(resync_log_start_idx, 8057, "#log start index for the LeaderView::ResyncLog interface.");
DEFINE_uint32(deque_push_count, 100000, "#elements pushed before testing.");
DEFINE_uint32(deque_op_count, 100000, "#operations for lockfree deque testing.");
DEFINE_uint32(meta_count, 80000, "#meta items for testing memory useage.");
DEFINE_uint32(follower_svc_benchmark_req_round, 10000, "#rounds(phaseI+phaseII) of requests sent during follower service benchmarking.");
DEFINE_uint32(leader_svc_benchmark_req_count, 10000, "#requests of requests sent during leader service benchmarking.");
DEFINE_uint32(benchmark_client_cq_num, 2, "#CQ client used to trigger the requests.");
DEFINE_uint32(benchmark_client_thread_num_per_cq, 4, "#threads client per CQ used to trigger the requests.");
DEFINE_uint32(benchmark_client_polling_thread_num_per_cq, 4, "#threads client per CQ used to trigger the requests.");
DEFINE_uint32(client_write_timo_ms, 50, "timeout value(ms) for client writing.");
DEFINE_bool(benchmark_client_split_entrusting, true, "whether to split the benchmark client entrusing process.");
DEFINE_uint32(benchmark_client_entrusting_thread_num, 1, ".");
DEFINE_string(target_ip, "default_none", "the target ip for a new benchmark server.");
DEFINE_string(my_ip, "default_none", "the ip addr to indicate myself in client req.");
DEFINE_uint32(storage_get_slice_count, 10, "#elements get from get_slice().");
DEFINE_uint32(retain_num_unordered_single_list, 100, "retain num for unordered_single_list unit test.");
DEFINE_bool(do_commit, false, "whether issue the commit request or not after appenedEntries.");
DEFINE_uint32(value_len, 2, "value length in unite test.");
DEFINE_uint32(client_count, 10000, "test client count.");
DEFINE_uint32(launch_threads_num, 0, "#threads in data structures benchmark.");
DEFINE_uint32(queue_initial_size, 0, "initial size for lockfree queue in unit test.");
DEFINE_uint32(queue_op_count, 1000000, "#operations for lockfree queue unit test.");
DEFINE_uint32(conn_op_count, 100000, "#operations for follower unit test.");
}
13 changes: 8 additions & 5 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ namespace RaftCore::Config {
DECLARE_uint32(notify_cq_threads);
DECLARE_uint32(call_cq_num);
DECLARE_uint32(call_cq_threads);
DECLARE_uint32(client_cq_num);
DECLARE_uint32(client_thread_num);
DECLARE_uint32(request_pool_size);

Expand Down Expand Up @@ -132,21 +131,25 @@ namespace RaftCore::Config {
DECLARE_bool(clear_existing_sstable_files);
DECLARE_uint32(hash_slot_num);
DECLARE_uint32(resync_log_start_idx);
DECLARE_uint32(deque_push_count);
DECLARE_uint32(deque_op_count);
DECLARE_uint32(meta_count);
DECLARE_uint32(follower_svc_benchmark_req_round);
DECLARE_uint32(leader_svc_benchmark_req_count);
DECLARE_uint32(benchmark_client_cq_num);
DECLARE_uint32(benchmark_client_thread_num_per_cq);
DECLARE_uint32(benchmark_client_polling_thread_num_per_cq);
DECLARE_uint32(benchmark_client_entrusting_thread_num);
DECLARE_uint32(client_write_timo_ms);
DECLARE_bool(benchmark_client_split_entrusting);
DECLARE_string(target_ip);
DECLARE_string(my_ip);
DECLARE_uint32(storage_get_slice_count);
DECLARE_uint32(retain_num_unordered_single_list);
DECLARE_bool(do_commit);
DECLARE_uint32(value_len);

DECLARE_uint32(client_count);
DECLARE_uint32(launch_threads_num);
DECLARE_uint32(queue_initial_size);
DECLARE_uint32(queue_op_count);
DECLARE_uint32(conn_op_count);
}

#endif
Loading

0 comments on commit 4f4264c

Please sign in to comment.