From 755aa22169cfdd460010a5393fb2b6ab58fb6aaf Mon Sep 17 00:00:00 2001 From: Yoshiaki Nishimura Date: Thu, 20 Feb 2025 21:38:50 +0900 Subject: [PATCH] add max_result_set_writers and excess check --- examples/service_benchmark/main.cpp | 1 + include/jogasaki/configuration.h | 10 +++++++++ src/jogasaki/api/impl/database.cpp | 5 +++-- src/jogasaki/api/resource/bridge.cpp | 31 ++++++++++++++++++++++++---- src/jogasaki/plan/compiler.cpp | 13 +++++++++--- 5 files changed, 51 insertions(+), 9 deletions(-) diff --git a/examples/service_benchmark/main.cpp b/examples/service_benchmark/main.cpp index ebe4e292..b1a7a7c4 100644 --- a/examples/service_benchmark/main.cpp +++ b/examples/service_benchmark/main.cpp @@ -114,6 +114,7 @@ DEFINE_bool(secondary, false, "use secondary index"); //NOLINT DEFINE_int64(scan_block_size, 100, "max records processed by scan operator before yielding to other tasks"); //NOLINT DEFINE_int64(scan_yield_interval, 1, "max time (ms) processed by scan operator before yielding to other tasks"); //NOLINT DEFINE_int64(scan_default_parallel, 1, "max parallel execution count of scan tasks"); //NOLINT +DEFINE_int64(max_result_set_writers, 64, "max number of result set writers"); //NOLINT namespace tateyama::service_benchmark { diff --git a/include/jogasaki/configuration.h b/include/jogasaki/configuration.h index 70098f0c..3aba05ce 100644 --- a/include/jogasaki/configuration.h +++ b/include/jogasaki/configuration.h @@ -531,6 +531,14 @@ class configuration { enable_blob_cast_ = arg; } + [[nodiscard]] std::size_t max_result_set_writers() const noexcept { + return max_result_set_writers_; + } + + void max_result_set_writers(std::size_t arg) noexcept { + max_result_set_writers_ = arg; + } + friend inline std::ostream& operator<<(std::ostream& out, configuration const& cfg) { //NOLINTBEGIN @@ -592,6 +600,7 @@ class configuration { print_non_default(key_distribution); print_non_default(mock_datastore); print_non_default(enable_blob_cast); + print_non_default(max_result_set_writers); if(cfg.req_cancel_config()) { out << "req_cancel_config:" << *cfg.req_cancel_config() << " "; \ @@ -657,6 +666,7 @@ class configuration { key_distribution_kind key_distribution_{key_distribution_kind::uniform}; bool mock_datastore_ = false; bool enable_blob_cast_ = false; + std::size_t max_result_set_writers_ = 64; }; } // namespace jogasaki diff --git a/src/jogasaki/api/impl/database.cpp b/src/jogasaki/api/impl/database.cpp index e5fe5430..07072662 100644 --- a/src/jogasaki/api/impl/database.cpp +++ b/src/jogasaki/api/impl/database.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2025 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -187,6 +187,7 @@ void dump_public_configurations(configuration const& cfg) { LOGCFG << "(enable_join_scan) " << cfg.enable_join_scan() << " : whether to enable index join using join_scan operator"; LOGCFG << "(dev_rtx_key_distribution) " << cfg.key_distribution() << " : key distribution policy used for RTX parallel scan"; LOGCFG << "(dev_enable_blob_cast) " << cfg.enable_blob_cast() << " : whether to enable cast expression to/from blob/clob data"; + LOGCFG << "(max_result_set_writers) " << cfg.max_result_set_writers() << " : max number of result set writers"; } status database::start() { @@ -1381,4 +1382,4 @@ std::unique_ptr create_database(std::shared_ptr c std::unique_ptr create_database(std::shared_ptr cfg, sharksfin::DatabaseHandle db) { return std::make_unique(std::move(cfg), db); } -} +} // namespace jogasaki::api::impl diff --git a/src/jogasaki/api/resource/bridge.cpp b/src/jogasaki/api/resource/bridge.cpp index 8f4b4aa6..b243acca 100644 --- a/src/jogasaki/api/resource/bridge.cpp +++ b/src/jogasaki/api/resource/bridge.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 Project Tsurugi. + * Copyright 2018-2025 Project Tsurugi. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -159,8 +159,24 @@ bool process_sql_config(std::shared_ptr& ret, tateyama: if (auto v = jogasaki_config->get("prepare_analytics_benchmark_tables")) { ret->prepare_analytics_benchmark_tables(v.value()); } + if (auto v = jogasaki_config->get("max_result_set_writers")) { + const std::size_t writers = v.value(); + if (writers > 256) { + LOG_LP(ERROR) << "Too large max_result_set_writers (" << writers + << ") given. It must be smaller than 256."; + return false; + } + ret->max_result_set_writers(writers); + } if (auto v = jogasaki_config->get("default_partitions")) { - ret->default_partitions(v.value()); + const std::size_t partitions = v.value(); + if (partitions > ret->max_result_set_writers()) { + LOG_LP(ERROR) << "Too large default_partitions (" << partitions + << ") given. It must be smaller than max_result_set_writers (" + << ret->max_result_set_writers() << ")."; + return false; + } + ret->default_partitions(partitions); } if (auto v = jogasaki_config->get("stealing_enabled")) { ret->stealing_enabled(v.value()); @@ -258,7 +274,14 @@ bool process_sql_config(std::shared_ptr& ret, tateyama: ret->direct_commit_callback(v.value()); } if (auto v = jogasaki_config->get("scan_default_parallel")) { - ret->scan_default_parallel(v.value()); + const std::size_t parallel = v.value(); + if (parallel > ret->max_result_set_writers()) { + LOG_LP(ERROR) << "Too large scan_default_parallel (" << parallel + << ") given. It must be smaller than max_result_set_writers (" + << ret->max_result_set_writers() << ")."; + return false; + } + ret->scan_default_parallel(parallel); } if (auto v = jogasaki_config->get("dev_inplace_teardown")) { ret->inplace_teardown(v.value()); @@ -319,5 +342,5 @@ std::shared_ptr convert_config(tateyama::api::configura } } -} +} // namespace jogasaki::api::resource diff --git a/src/jogasaki/plan/compiler.cpp b/src/jogasaki/plan/compiler.cpp index 855ac4fb..a6febea1 100644 --- a/src/jogasaki/plan/compiler.cpp +++ b/src/jogasaki/plan/compiler.cpp @@ -1214,12 +1214,19 @@ size_t intermediate_calculate_partition(takatori::plan::step const& s) noexcept return sum; } size_t calculate_partition(takatori::plan::step const& s) noexcept { - auto& process = unsafe_downcast(s); + auto& process = unsafe_downcast(s); + auto partition = global::config_pool()->default_partitions(); if (!process.downstreams().empty()) { VLOG_LP(log_error) << "The bottom of graph_type must not have downstreams"; - return global::config_pool()->default_partitions(); + } else { + partition = intermediate_calculate_partition(s); } - return intermediate_calculate_partition(s); + if (partition > global::config_pool()->max_result_set_writers()) { + LOG_LP(ERROR) << "The result calculated by calculate_partition(" << partition + << ") exceeded max_result_set_writers(" + << global::config_pool()->max_result_set_writers() << ")"; + } + return partition; } } // namespace impl