Skip to content

Commit

Permalink
[fix][coordinator] Expand job map initial bucket size.
Browse files Browse the repository at this point in the history
If the initial bucket size is too small, there will be a performance
issue when many jobs generated in seconds.

Signed-off-by: Ketor <[email protected]>
  • Loading branch information
ketor committed Jan 23, 2025
1 parent d6a9d25 commit 798deef
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 23 deletions.
36 changes: 18 additions & 18 deletions src/coordinator/coordinator_control.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,26 +92,26 @@ CoordinatorControl::CoordinatorControl(std::shared_ptr<MetaReader> meta_reader,
&table_index_map_, kPrefixTableIndex, raw_engine_of_meta);

// init SafeMap
id_epoch_map_.Init(100); // id_epoch_map_ is a small map
id_epoch_map_safe_temp_.Init(100); // id_epoch_map_temp_ is a small map
schema_name_map_safe_temp_.Init(1000); // schema_map_ is a big map
table_name_map_safe_temp_.Init(10000); // table_map_ is a big map
store_operation_map_.Init(100); // store_operation_map_ is a small map
region_cmd_map_.Init(2000); // region_cmd_map_ is a big map
schema_map_.Init(10000); // schema_map_ is a big map
table_map_.Init(10000); // table_map_ is a big map
region_map_.Init(30000); // region_map_ is a big map
region_metrics_map_.Init(30000); // region_metrics_map_ is a big map
coordinator_map_.Init(10); // coordinator_map_ is a small map
store_map_.Init(100); // store_map_ is a small map
table_metrics_map_.Init(10000); // table_metrics_map_ is a big map
job_map_.Init(100); // job_map_ is a small map
index_name_map_safe_temp_.Init(10000); // index_map_ is a big map
index_map_.Init(10000); // index_map_ is a big map
index_metrics_map_.Init(10000); // index_metrics_map_ is a big map
id_epoch_map_.Init(100); // id_epoch_map_ is a small map
id_epoch_map_safe_temp_.Init(100); // id_epoch_map_temp_ is a small map
schema_name_map_safe_temp_.Init(10000); // schema_map_ is a big map
table_name_map_safe_temp_.Init(100000); // table_map_ is a big map
store_operation_map_.Init(10000); // store_operation_map_ is a big map
region_cmd_map_.Init(100000); // region_cmd_map_ is a big map
schema_map_.Init(10000); // schema_map_ is a big map
table_map_.Init(100000); // table_map_ is a big map
region_map_.Init(500000); // region_map_ is a big map
region_metrics_map_.Init(500000); // region_metrics_map_ is a big map
coordinator_map_.Init(100); // coordinator_map_ is a small map
store_map_.Init(200); // store_map_ is a small map
table_metrics_map_.Init(100000); // table_metrics_map_ is a big map
job_map_.Init(100000); // job_map_ is a big map
index_name_map_safe_temp_.Init(100000); // index_map_ is a big map
index_map_.Init(100000); // index_map_ is a big map
index_metrics_map_.Init(100000); // index_metrics_map_ is a big map

// table index
table_index_map_.Init(10000);
table_index_map_.Init(100000);

// tenant
tenant_map_.Init(1000); // tenant_map_ is a small map
Expand Down
13 changes: 8 additions & 5 deletions src/coordinator/coordinator_control_coor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3637,7 +3637,7 @@ butil::Status CoordinatorControl::CreateRegionWithJob(std::vector<pb::coordinato
butil::Status CoordinatorControl::ValidateJobConflict(int64_t region_id, int64_t second_region_id) {
// check job conflict
butil::FlatMap<int64_t, pb::coordinator::Job> temp_job_map;
temp_job_map.init(1000);
temp_job_map.init(10000);
int ret = job_map_.GetRawMapCopy(temp_job_map);
if (ret < 0) {
DINGO_LOG(ERROR) << "ValidateJobConflict job_map_.GetRawMapCopy "
Expand Down Expand Up @@ -3681,7 +3681,7 @@ butil::Status CoordinatorControl::ValidateJobConflict(int64_t region_id, int64_t

// check store operation conflict
butil::FlatMap<int64_t, pb::coordinator_internal::StoreOperationInternal> store_operation_map_temp;
store_operation_map_temp.init(1000);
store_operation_map_temp.init(10000);
ret = store_operation_map_.GetRawMapCopy(store_operation_map_temp);
if (ret < 0) {
DINGO_LOG(ERROR) << "ValidateJobConflict store_operation_map_.GetRawMapCopy "
Expand Down Expand Up @@ -4622,9 +4622,9 @@ void CoordinatorControl::UpdateRegionMapAndStoreOperation(const pb::common::Stor
region_to_update.definition().range().end_key() != region_metrics.region_definition().range().end_key()) {
DINGO_LOG(INFO) << "region range change region_id = " << region_metrics.id() << " old range = ["
<< Helper::StringToHex(region_to_update.definition().range().start_key()) << ", "
<< Helper::StringToHex(region_to_update.definition().range().end_key()) << ")"
<< " new range = [" << Helper::StringToHex(region_metrics.region_definition().range().start_key())
<< ", " << Helper::StringToHex(region_metrics.region_definition().range().end_key()) << ")";
<< Helper::StringToHex(region_to_update.definition().range().end_key()) << ")" << " new range = ["
<< Helper::StringToHex(region_metrics.region_definition().range().start_key()) << ", "
<< Helper::StringToHex(region_metrics.region_definition().range().end_key()) << ")";
if (!leader_has_old_epoch) {
need_update_region_metrics = true;
}
Expand Down Expand Up @@ -6627,6 +6627,9 @@ butil::Status CoordinatorControl::RpcSendPushStoreOperation(const pb::common::Lo

pb::push::PushService_Stub(&channel).PushStoreOperation(&cntl, &request, &response, nullptr);

DINGO_LOG(INFO) << fmt::format("[joblist] send store_operation to store:{}, request:{}",
location.ShortDebugString(), request.ShortDebugString());

if (cntl.Failed()) {
DINGO_LOG(ERROR) << fmt::format("[joblist] rpc failed, will retry, error code:{}, error message:{}",
cntl.ErrorCode(), cntl.ErrorText());
Expand Down

0 comments on commit 798deef

Please sign in to comment.