Skip to content

Commit

Permalink
add max_result_set_writers and excess check
Browse files Browse the repository at this point in the history
  • Loading branch information
YoshiakiNishimura committed Feb 20, 2025
1 parent 1984169 commit c613cfa
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 9 deletions.
1 change: 1 addition & 0 deletions examples/service_benchmark/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ DEFINE_bool(secondary, false, "use secondary index"); //NOLINT
DEFINE_int64(scan_block_size, 100, "max records processed by scan operator before yielding to other tasks"); //NOLINT
DEFINE_int64(scan_yield_interval, 1, "max time (ms) processed by scan operator before yielding to other tasks"); //NOLINT
DEFINE_int64(scan_default_parallel, 1, "max parallel execution count of scan tasks"); //NOLINT
DEFINE_int64(max_result_set_writers, 64, "max number of result set writers"); //NOLINT

namespace tateyama::service_benchmark {

Expand Down
10 changes: 10 additions & 0 deletions include/jogasaki/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,14 @@ class configuration {
enable_blob_cast_ = arg;
}

[[nodiscard]] std::size_t max_result_set_writers() const noexcept {
return max_result_set_writers_;
}

void max_result_set_writers(std::size_t arg) noexcept {
max_result_set_writers_ = arg;
}

friend inline std::ostream& operator<<(std::ostream& out, configuration const& cfg) {

//NOLINTBEGIN
Expand Down Expand Up @@ -592,6 +600,7 @@ class configuration {
print_non_default(key_distribution);
print_non_default(mock_datastore);
print_non_default(enable_blob_cast);
print_non_default(max_result_set_writers);

if(cfg.req_cancel_config()) {
out << "req_cancel_config:" << *cfg.req_cancel_config() << " "; \
Expand Down Expand Up @@ -657,6 +666,7 @@ class configuration {
key_distribution_kind key_distribution_{key_distribution_kind::uniform};
bool mock_datastore_ = false;
bool enable_blob_cast_ = false;
std::size_t max_result_set_writers_ = 64;
};

} // namespace jogasaki
5 changes: 3 additions & 2 deletions src/jogasaki/api/impl/database.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -187,6 +187,7 @@ void dump_public_configurations(configuration const& cfg) {
LOGCFG << "(enable_join_scan) " << cfg.enable_join_scan() << " : whether to enable index join using join_scan operator";
LOGCFG << "(dev_rtx_key_distribution) " << cfg.key_distribution() << " : key distribution policy used for RTX parallel scan";
LOGCFG << "(dev_enable_blob_cast) " << cfg.enable_blob_cast() << " : whether to enable cast expression to/from blob/clob data";
LOGCFG << "(max_result_set_writers) " << cfg.max_result_set_writers() << " : max number of result set writers";
}

status database::start() {
Expand Down Expand Up @@ -1381,4 +1382,4 @@ std::unique_ptr<database> create_database(std::shared_ptr<class configuration> c
std::unique_ptr<database> create_database(std::shared_ptr<configuration> cfg, sharksfin::DatabaseHandle db) {
return std::make_unique<impl::database>(std::move(cfg), db);
}
}
} // namespace jogasaki::api::impl
28 changes: 24 additions & 4 deletions src/jogasaki/api/resource/bridge.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2025 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -159,8 +159,22 @@ bool process_sql_config(std::shared_ptr<jogasaki::configuration>& ret, tateyama:
if (auto v = jogasaki_config->get<bool>("prepare_analytics_benchmark_tables")) {
ret->prepare_analytics_benchmark_tables(v.value());
}
if (auto v = jogasaki_config->get<std::size_t>("max_result_set_writers")) {
const std::size_t writers = v.value();
if (writers > 256) {
LOG_LP(ERROR) << "Too large max_result_set_writers (" << writers
<< ") given. It must be smaller than 256.";
}
ret->max_result_set_writers(writers);
}
if (auto v = jogasaki_config->get<std::size_t>("default_partitions")) {
ret->default_partitions(v.value());
const std::size_t partitions = v.value();
if (partitions > ret->max_result_set_writers()) {
LOG_LP(ERROR) << "Too large default_partitions (" << partitions
<< ") given. It must be smaller than max_result_set_writers ("
<< ret->max_result_set_writers() << ").";
}
ret->default_partitions(partitions);
}
if (auto v = jogasaki_config->get<bool>("stealing_enabled")) {
ret->stealing_enabled(v.value());
Expand Down Expand Up @@ -258,7 +272,13 @@ bool process_sql_config(std::shared_ptr<jogasaki::configuration>& ret, tateyama:
ret->direct_commit_callback(v.value());
}
if (auto v = jogasaki_config->get<std::size_t>("scan_default_parallel")) {
ret->scan_default_parallel(v.value());
const std::size_t parallel = v.value();
if (parallel > ret->max_result_set_writers()) {
LOG_LP(ERROR) << "Too large scan_default_parallel (" << parallel
<< ") given. It must be smaller than max_result_set_writers ("
<< ret->max_result_set_writers() << ").";
}
ret->scan_default_parallel(parallel);
}
if (auto v = jogasaki_config->get<bool>("dev_inplace_teardown")) {
ret->inplace_teardown(v.value());
Expand Down Expand Up @@ -319,5 +339,5 @@ std::shared_ptr<jogasaki::configuration> convert_config(tateyama::api::configura
}
}

}
} // namespace jogasaki::api::resource

13 changes: 10 additions & 3 deletions src/jogasaki/plan/compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1214,12 +1214,19 @@ size_t intermediate_calculate_partition(takatori::plan::step const& s) noexcept
return sum;
}
size_t calculate_partition(takatori::plan::step const& s) noexcept {
auto& process = unsafe_downcast<takatori::plan::process>(s);
auto& process = unsafe_downcast<takatori::plan::process>(s);
auto partition = global::config_pool()->default_partitions();
if (!process.downstreams().empty()) {
VLOG_LP(log_error) << "The bottom of graph_type must not have downstreams";
return global::config_pool()->default_partitions();
} else {
partition = intermediate_calculate_partition(s);
}
return intermediate_calculate_partition(s);
if (partition > global::config_pool()->max_result_set_writers()) {
LOG_LP(ERROR) << "The result calculated by calculate_partition(" << partition
<< ") exceeded max_result_set_writers("
<< global::config_pool()->max_result_set_writers() << ")";
}
return partition;
}

} // namespace impl
Expand Down

0 comments on commit c613cfa

Please sign in to comment.