Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

One thread per disk in compaction #1161

Merged
merged 2 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ namespace config {
CONF_Int32(base_compaction_check_interval_seconds, "60");
CONF_Int64(base_compaction_num_cumulative_deltas, "5");
CONF_Int32(base_compaction_num_threads, "1");
CONF_Int32(base_compaction_num_threads_per_disk, "1");
CONF_Double(base_cumulative_delta_ratio, "0.3");
CONF_Int64(base_compaction_interval_seconds_since_last_operation, "604800");
CONF_Int32(base_compaction_write_mbytes_per_sec, "5");
Expand All @@ -245,6 +246,7 @@ namespace config {
CONF_Int32(cumulative_compaction_check_interval_seconds, "10");
CONF_Int64(cumulative_compaction_num_singleton_deltas, "5");
CONF_Int32(cumulative_compaction_num_threads, "1");
CONF_Int32(cumulative_compaction_num_threads_per_disk, "1");
CONF_Int64(cumulative_compaction_budgeted_bytes, "104857600");
CONF_Int32(cumulative_compaction_write_mbytes_per_sec, "100");

Expand Down
13 changes: 7 additions & 6 deletions be/src/olap/olap_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1833,8 +1833,8 @@ void OLAPEngine::start_clean_fd_cache() {
VLOG(10) << "end clean file descritpor cache";
}

void OLAPEngine::perform_cumulative_compaction() {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION);
void OLAPEngine::perform_cumulative_compaction(OlapStore* store) {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION, store);
if (best_table == nullptr) { return; }

DorisMetrics::cumulative_compaction_request_total.increment(1);
Expand Down Expand Up @@ -1863,8 +1863,8 @@ void OLAPEngine::perform_cumulative_compaction() {
best_table->set_last_compaction_failure_time(0);
}

void OLAPEngine::perform_base_compaction() {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION);
void OLAPEngine::perform_base_compaction(OlapStore* store) {
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION, store);
if (best_table == nullptr) { return; }

DorisMetrics::base_compaction_request_total.increment(1);
Expand Down Expand Up @@ -1893,14 +1893,15 @@ void OLAPEngine::perform_base_compaction() {
best_table->set_last_compaction_failure_time(0);
}

OLAPTablePtr OLAPEngine::_find_best_tablet_to_compaction(CompactionType compaction_type) {
OLAPTablePtr OLAPEngine::_find_best_tablet_to_compaction(CompactionType compaction_type, OlapStore* store) {
ReadLock tablet_map_rdlock(&_tablet_map_lock);
uint32_t highest_score = 0;
OLAPTablePtr best_table;
int64_t now = UnixMillis();
for (tablet_map_t::value_type& table_ins : _tablet_map){
for (OLAPTablePtr& table_ptr : table_ins.second.table_arr) {
if (!table_ptr->is_used() || !table_ptr->is_loaded() || !_can_do_compaction(table_ptr)) {
if (table_ptr->store()->path_hash() != store->path_hash()
|| !table_ptr->is_used() || !table_ptr->is_loaded() || !_can_do_compaction(table_ptr)) {
continue;
}

Expand Down
10 changes: 5 additions & 5 deletions be/src/olap/olap_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ class OLAPEngine {
OLAPStatus clear();

void start_clean_fd_cache();
void perform_cumulative_compaction();
void perform_base_compaction();
void perform_cumulative_compaction(OlapStore* store);
void perform_base_compaction(OlapStore* store);

// 获取cache的使用情况信息
void get_cache_status(rapidjson::Document* document) const;
Expand Down Expand Up @@ -531,7 +531,7 @@ class OLAPEngine {

OLAPStatus _check_existed_or_else_create_dir(const std::string& path);

OLAPTablePtr _find_best_tablet_to_compaction(CompactionType compaction_type);
OLAPTablePtr _find_best_tablet_to_compaction(CompactionType compaction_type, OlapStore* store);
bool _can_do_compaction(OLAPTablePtr table);

void _cancel_unfinished_schema_change();
Expand Down Expand Up @@ -588,7 +588,7 @@ class OLAPEngine {
// Thread functions

// base compaction thread process function
void* _base_compaction_thread_callback(void* arg);
void* _base_compaction_thread_callback(void* arg, OlapStore* store);

// garbage sweep thread process function. clear snapshot and trash folder
void* _garbage_sweeper_thread_callback(void* arg);
Expand All @@ -600,7 +600,7 @@ class OLAPEngine {
void* _unused_index_thread_callback(void* arg);

// cumulative process function
void* _cumulative_compaction_thread_callback(void* arg);
void* _cumulative_compaction_thread_callback(void* arg, OlapStore* store);

// clean file descriptors cache
void* _fd_cache_clean_callback(void* arg);
Expand Down
29 changes: 18 additions & 11 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,30 @@ OLAPStatus OLAPEngine::_start_bg_worker() {
[this] {
_unused_index_thread_callback(nullptr);
});


uint32_t file_system_num = get_file_system_count();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to get_store_num() is better

// convert store map to vector
std::vector<OlapStore*> store_vec;
for (auto& tmp_store : _store_map) {
store_vec.push_back(tmp_store.second);
}
int32_t store_num = store_vec.size();
// start be and ce threads for merge data
int32_t base_compaction_num_threads = config::base_compaction_num_threads;
int32_t base_compaction_num_threads = config::base_compaction_num_threads_per_disk * file_system_num;
_base_compaction_threads.reserve(base_compaction_num_threads);
for (uint32_t i = 0; i < base_compaction_num_threads; ++i) {
_base_compaction_threads.emplace_back(
[this] {
_base_compaction_thread_callback(nullptr);
[this, store_num, store_vec, i] {
_base_compaction_thread_callback(nullptr, store_vec[i % store_num]);
});
}

int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads;
int32_t cumulative_compaction_num_threads = config::cumulative_compaction_num_threads_per_disk * file_system_num;
_cumulative_compaction_threads.reserve(cumulative_compaction_num_threads);
for (uint32_t i = 0; i < cumulative_compaction_num_threads; ++i) {
_cumulative_compaction_threads.emplace_back(
[this] {
_cumulative_compaction_thread_callback(nullptr);
[this, store_num, store_vec, i] {
_cumulative_compaction_thread_callback(nullptr, store_vec[i % store_num]);
});
}

Expand Down Expand Up @@ -104,7 +111,7 @@ void* OLAPEngine::_fd_cache_clean_callback(void* arg) {
return NULL;
}

void* OLAPEngine::_base_compaction_thread_callback(void* arg) {
void* OLAPEngine::_base_compaction_thread_callback(void* arg, OlapStore* store) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
Expand All @@ -122,7 +129,7 @@ void* OLAPEngine::_base_compaction_thread_callback(void* arg) {
// cgroup is not initialized at this time
// add tid to cgroup
CgroupsMgr::apply_system_cgroup();
perform_base_compaction();
perform_base_compaction(store);

usleep(interval * 1000000);
}
Expand Down Expand Up @@ -218,7 +225,7 @@ void* OLAPEngine::_unused_index_thread_callback(void* arg) {
return NULL;
}

void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg) {
void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg, OlapStore* store) {
#ifdef GOOGLE_PROFILER
ProfilerRegisterThread();
#endif
Expand All @@ -235,7 +242,7 @@ void* OLAPEngine::_cumulative_compaction_thread_callback(void* arg) {
// cgroup is not initialized at this time
// add tid to cgroup
CgroupsMgr::apply_system_cgroup();
perform_cumulative_compaction();
perform_cumulative_compaction(store);
usleep(interval * 1000000);
}

Expand Down