Skip to content

Commit

Permalink
add optional parameter, writer_count, to response::acquire_channel() …
Browse files Browse the repository at this point in the history
…and eliminate unnecessary functions
  • Loading branch information
t-horikawa committed Jan 17, 2025
1 parent f01c7ef commit 0489be5
Show file tree
Hide file tree
Showing 26 changed files with 62 additions and 73 deletions.
2 changes: 1 addition & 1 deletion bench/data_channel_write/data_channel_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class data_channel_write_service: public server_service_base {
std::string payload { req->payload() };
//
std::shared_ptr<tateyama::api::server::data_channel> channel;
ASSERT_OK(res->acquire_channel(payload, channel));
ASSERT_OK(res->acquire_channel(payload, channel, default_writer_count));
std::shared_ptr<tateyama::api::server::writer> writer;
ASSERT_OK(channel->acquire(writer));
//
Expand Down
3 changes: 2 additions & 1 deletion include/tateyama/api/server/response.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ class response {
* @brief retrieve output data channel
* @param name the name of the output
* @param ch [out] the data channel for the given name
* @param writer_count the numver of the writers used in the output
* @detail this function provides the named data channel for the application output
* @note this function is thread-safe and multiple threads can invoke simultaneously.
* @return status::ok when successful
* @return other code when error occurs
*/
virtual status acquire_channel(std::string_view name, std::shared_ptr<data_channel>& ch) = 0;
virtual status acquire_channel(std::string_view name, std::shared_ptr<data_channel>& ch, std::size_t writer_count) = 0;

/**
* @brief release the data channel
Expand Down
15 changes: 1 addition & 14 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,6 @@ class ipc_listener : public tateyama::endpoint::common::listener_common {
ipc_metrics_.set_memory_parameters(connection_container::fixed_memory_size(threads + admin_sessions),
server_wire_container_impl::proportional_memory_size(datachannel_buffer_size_, max_datachannel_buffers_));

// set maximum writer count from sql.default_partitions
auto* sql_config = cfg_->get_section("sql");
if (sql_config == nullptr) {
throw std::runtime_error("cannot find sql section in the configuration");
}
auto default_partitions_opt = sql_config->get<std::size_t>("default_partitions");
if (default_partitions_opt) {
if (writer_count_ < default_partitions_opt.value()) {
writer_count_ = default_partitions_opt.value();
}
}

// output configuration to be used
LOG(INFO) << tateyama::endpoint::common::ipc_endpoint_config_prefix
<< "database_name: " << database_name_ << ", "
Expand Down Expand Up @@ -207,7 +195,7 @@ class ipc_listener : public tateyama::endpoint::common::listener_common {

auto& worker_entry = workers_.at(slot_index);
std::unique_lock<std::mutex> lock(mtx_workers_);
worker_entry = std::make_shared<ipc_worker>(*router_, conf_, session_id, std::move(wire), status_->database_info(), writer_count_);
worker_entry = std::make_shared<ipc_worker>(*router_, conf_, session_id, std::move(wire), status_->database_info());
connection_queue.accept(slot_id, session_id);
ipc_metrics_.increase();
worker_entry->invoke([this, slot_id, slot_index, &connection_queue]{
Expand Down Expand Up @@ -294,7 +282,6 @@ class ipc_listener : public tateyama::endpoint::common::listener_common {
std::string proc_mutex_file_;
std::size_t datachannel_buffer_size_{};
std::size_t max_datachannel_buffers_{};
std::size_t writer_count_{ipc_response::default_writer_count};
std::mutex mtx_workers_{};
std::mutex mtx_undertakers_{};

Expand Down
8 changes: 4 additions & 4 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void ipc_worker::run() { // NOLINT(readability-function-cognitive-complexity)
}

ipc_request request_obj{*wire_, hdr, database_info_, session_info_, session_store_, session_variable_set_, local_id_++};
ipc_response response_obj{wire_, hdr.get_idx(), writer_count_, [](){}};
ipc_response response_obj{wire_, hdr.get_idx(), [](){}};
if (! handshake(static_cast<tateyama::api::server::request*>(&request_obj), static_cast<tateyama::api::server::response*>(&response_obj))) {
return;
}
Expand Down Expand Up @@ -86,7 +86,7 @@ void ipc_worker::run() { // NOLINT(readability-function-cognitive-complexity)
switch (request->service_id()) {
case tateyama::framework::service_id_endpoint_broker:
{
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), writer_count_, [](){});
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [](){});
// currently cancel request only
if (!endpoint_service(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response),
Expand All @@ -98,7 +98,7 @@ void ipc_worker::run() { // NOLINT(readability-function-cognitive-complexity)
}
case tateyama::framework::service_id_routing:
{
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), writer_count_, [this, index](){remove_reqres(index);});
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [this, index](){remove_reqres(index);});
register_reqres(index,
std::dynamic_pointer_cast<tateyama::endpoint::common::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
Expand All @@ -121,7 +121,7 @@ void ipc_worker::run() { // NOLINT(readability-function-cognitive-complexity)
}
default:
{
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), writer_count_, [this, index](){remove_reqres(index);});
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [this, index](){remove_reqres(index);});
if (!check_shutdown_request()) {
register_reqres(index,
std::dynamic_pointer_cast<tateyama::endpoint::common::request>(request),
Expand Down
7 changes: 2 additions & 5 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ class alignas(64) ipc_worker : public tateyama::endpoint::common::worker_common
const tateyama::endpoint::common::worker_common::configuration& conf,
std::size_t session_id,
std::shared_ptr<server_wire_container_impl> wire,
const tateyama::api::server::database_info& database_info,
std::size_t writer_count) :
const tateyama::api::server::database_info& database_info) :
worker_common(conf, session_id, ""),
service_(service),
wire_(std::move(wire)),
request_wire_container_(dynamic_cast<server_wire_container_impl::wire_container_impl*>(wire_->get_request_wire())),
database_info_(database_info),
writer_count_(writer_count) {
database_info_(database_info) {
}
void delete_hook() {
shutdown_complete();
Expand All @@ -53,7 +51,6 @@ class alignas(64) ipc_worker : public tateyama::endpoint::common::worker_common
std::shared_ptr<server_wire_container_impl> wire_;
server_wire_container_impl::wire_container_impl* request_wire_container_;
const tateyama::api::server::database_info& database_info_;
std::size_t writer_count_;

bool has_incomplete_resultset() override {
auto* gc = wire_->get_garbage_collector();
Expand Down
4 changes: 2 additions & 2 deletions src/tateyama/endpoint/ipc/ipc_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ void ipc_response::server_diagnostics(std::string_view diagnostic_record) {
server_wire_->get_response_wire().write(s.data(), tateyama::common::wire::response_header(index_, s.length(), RESPONSE_BODY));
}

tateyama::status ipc_response::acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch) {
tateyama::status ipc_response::acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch, std::size_t writer_count) {
if (completed_.load()) {
LOG_LP(ERROR) << "response is already completed";
set_state(state::acquire_failed);
return tateyama::status::unknown;
}
try {
data_channel_ = std::make_shared<ipc_data_channel>(server_wire_->create_resultset_wires(name, writer_count_), garbage_collector_);
data_channel_ = std::make_shared<ipc_data_channel>(server_wire_->create_resultset_wires(name, writer_count), garbage_collector_);
} catch (std::exception &ex) {
ch = nullptr;

Expand Down
8 changes: 2 additions & 6 deletions src/tateyama/endpoint/ipc/ipc_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,10 @@ class alignas(64) ipc_response : public tateyama::endpoint::common::response {
friend ipc_data_channel;

public:
static constexpr std::size_t default_writer_count = 32;

ipc_response(std::shared_ptr<server_wire_container> server_wire, std::size_t index, std::size_t writer_count, std::function<void(void)> clean_up) :
ipc_response(std::shared_ptr<server_wire_container> server_wire, std::size_t index, std::function<void(void)> clean_up) :
tateyama::endpoint::common::response(index),
server_wire_(std::move(server_wire)),
garbage_collector_(*server_wire_->get_garbage_collector()),
writer_count_(writer_count),
clean_up_(std::move(clean_up)) {
}
~ipc_response() override {
Expand All @@ -117,13 +114,12 @@ class alignas(64) ipc_response : public tateyama::endpoint::common::response {
tateyama::status body(std::string_view body) override;
tateyama::status body_head(std::string_view body_head) override;
void error(proto::diagnostics::Record const& record) override;
tateyama::status acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch) override;
tateyama::status acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch, std::size_t writer_count) override;
tateyama::status release_channel(tateyama::api::server::data_channel& ch) override;

private:
std::shared_ptr<server_wire_container> server_wire_;
garbage_collector& garbage_collector_;
std::size_t writer_count_;
const std::function<void(void)> clean_up_;

void server_diagnostics(std::string_view diagnostic_record);
Expand Down
3 changes: 2 additions & 1 deletion src/tateyama/endpoint/loopback/loopback_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
namespace tateyama::endpoint::loopback {

tateyama::status loopback_response::acquire_channel(std::string_view name,
std::shared_ptr<tateyama::api::server::data_channel> &ch) {
std::shared_ptr<tateyama::api::server::data_channel> &ch,
[[maybe_unused]] std::size_t writer_count) {
std::unique_lock<std::mutex> lock(mtx_channel_map_);
if (channel_map_.find(name) != channel_map_.cend()) {
// already acquired the same name channel
Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/loopback/loopback_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class loopback_response: public tateyama::api::server::response {
/**
* @see tateyama::server::response::acquire_channel()
*/
tateyama::status acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel> &ch)
tateyama::status acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel> &ch, std::size_t writer_count)
override;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/stream/stream_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void stream_response::server_diagnostics(std::string_view diagnostic_record) {
stream_->send(index_, s, true);
}

tateyama::status stream_response::acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch) {
tateyama::status stream_response::acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch, [[maybe_unused]] std::size_t writer_count) {
try {
auto slot = stream_->look_for_slot();
data_channel_ = std::make_unique<stream_data_channel>(stream_, slot);
Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/stream/stream_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class alignas(64) stream_response : public tateyama::endpoint::common::response
tateyama::status body(std::string_view body) override;
tateyama::status body_head(std::string_view body_head) override;
void error(proto::diagnostics::Record const& record) override;
tateyama::status acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch) override;
tateyama::status acquire_channel(std::string_view name, std::shared_ptr<tateyama::api::server::data_channel>& ch, std::size_t writer_count) override;
tateyama::status release_channel(tateyama::api::server::data_channel& ch) override;

private:
Expand Down
2 changes: 1 addition & 1 deletion test/tateyama/endpoint/ipc/ipc_info_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ TEST_F(ipc_info_test, basic) {
session_name += std::to_string(my_session_id);
auto wire = std::make_shared<bootstrap::server_wire_container_impl>(session_name, "dummy_mutex_file_name", datachannel_buffer_size, 16);
const tateyama::endpoint::common::worker_common::configuration conf(tateyama::endpoint::common::worker_common::connection_type::ipc);
tateyama::endpoint::ipc::bootstrap::ipc_worker worker(service_, conf, my_session_id, wire, database_info_, writer_count);
tateyama::endpoint::ipc::bootstrap::ipc_worker worker(service_, conf, my_session_id, wire, database_info_);
tateyama::server::ipc_listener_for_test::run(worker);

// client part
Expand Down
2 changes: 1 addition & 1 deletion test/tateyama/endpoint/ipc/ipc_resultset_multi_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class resultset_multi_service: public server_service_base {
EXPECT_GT(param.write_lens_.size(), 0);
//
std::shared_ptr<tateyama::api::server::data_channel> channel;
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(param.name_, channel));
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(param.name_, channel, default_writer_count));
EXPECT_NE(channel.get(), nullptr);
std::shared_ptr<tateyama::api::server::writer> writer;
EXPECT_EQ(tateyama::status::ok, channel->acquire(writer));
Expand Down
2 changes: 1 addition & 1 deletion test/tateyama/endpoint/ipc/ipc_resultset_oneshot_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class resultset_oneshot_service: public server_service_base {
std::string payload { req->payload() };
std::size_t datalen = std::stoul(payload);
std::shared_ptr<tateyama::api::server::data_channel> channel;
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(resultset_name, channel));
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(resultset_name, channel, default_writer_count));
EXPECT_NE(channel.get(), nullptr);
std::shared_ptr<tateyama::api::server::writer> writer;
EXPECT_EQ(tateyama::status::ok, channel->acquire(writer));
Expand Down
2 changes: 1 addition & 1 deletion test/tateyama/endpoint/ipc/ipc_resultset_skew_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class resultset_skew_service: public server_service_base {
auto len = param.write_lens_.at(0);
//
std::shared_ptr<tateyama::api::server::data_channel> channel;
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(param.name_, channel));
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(param.name_, channel, default_writer_count));
EXPECT_NE(channel.get(), nullptr);
std::shared_ptr<tateyama::api::server::writer> writer;
EXPECT_EQ(tateyama::status::ok, channel->acquire(writer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class resultset_writer_limit_service: public server_service_base {
const int nwriter = param.value_; // use as number of writers
//
std::shared_ptr<tateyama::api::server::data_channel> channel;
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(param.name_, channel));
EXPECT_EQ(tateyama::status::ok, res->acquire_channel(param.name_, channel, default_writer_count));
EXPECT_NE(channel.get(), nullptr);
std::vector<std::shared_ptr<tateyama::api::server::writer>> writer_list { };
for (int i = 0; i < nwriter; i++) {
Expand Down
2 changes: 1 addition & 1 deletion test/tateyama/endpoint/ipc/ipc_session_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class ipc_session_test : public ::testing::Test {
auto wire = std::make_shared<bootstrap::server_wire_container_impl>(session_name, "dummy_mutex_file_name", datachannel_buffer_size, 16);
session_bridge_ = std::make_shared<session::resource::bridge>();
const tateyama::endpoint::common::worker_common::configuration conf(tateyama::endpoint::common::worker_common::connection_type::ipc, session_bridge_);
worker_ = std::make_unique<tateyama::endpoint::ipc::bootstrap::ipc_worker>(service_, conf, my_session_id, wire, database_info_, writer_count);
worker_ = std::make_unique<tateyama::endpoint::ipc::bootstrap::ipc_worker>(service_, conf, my_session_id, wire, database_info_);
tateyama::server::ipc_listener_for_session_test::run(*worker_);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
client_ = std::make_unique<ipc_client>(database_name, my_session_id);
Expand Down
2 changes: 1 addition & 1 deletion test/tateyama/endpoint/ipc/ipc_store_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ipc_store_test : public ::testing::Test {
auto wire = std::make_shared<bootstrap::server_wire_container_impl>(session_name, "dummy_mutex_file_name", datachannel_buffer_size, 16);
session_bridge_ = std::make_shared<session::resource::bridge>();
const tateyama::endpoint::common::worker_common::configuration conf(tateyama::endpoint::common::worker_common::connection_type::ipc, session_bridge_);
worker_ = std::make_unique<tateyama::endpoint::ipc::bootstrap::ipc_worker>(service_, conf, my_session_id, wire, database_info_, writer_count);
worker_ = std::make_unique<tateyama::endpoint::ipc::bootstrap::ipc_worker>(service_, conf, my_session_id, wire, database_info_);
tateyama::server::ipc_listener_for_store_test::run(*worker_);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
client_ = std::make_unique<ipc_client>(database_name, my_session_id);
Expand Down
3 changes: 3 additions & 0 deletions test/tateyama/endpoint/ipc/ipc_test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ class server_service_base: public tateyama::framework::service {
[[nodiscard]] std::string_view label() const noexcept override {
return "server_service_base";
}

protected:
static constexpr std::size_t default_writer_count = 32;
};

class resultset_param {
Expand Down
Loading

0 comments on commit 0489be5

Please sign in to comment.