Skip to content

Commit

Permalink
Merge pull request #251 from project-tsurugi/wip/d_1036
Browse files Browse the repository at this point in the history
implement request service
  • Loading branch information
t-horikawa authored Dec 26, 2024
2 parents a5ec5e7 + 4d9f4d4 commit 6c915d3
Show file tree
Hide file tree
Showing 19 changed files with 980 additions and 44 deletions.
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ file(GLOB SOURCES
"tateyama/metrics/*.cpp"
"tateyama/metrics/service/*.cpp"
"tateyama/metrics/resource/*.cpp"
"tateyama/request/service/*.cpp"
)

if (ENABLE_ALTIMETER)
Expand Down
74 changes: 74 additions & 0 deletions src/tateyama/endpoint/common/listener_common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <ostream>
#include <memory>
#include <chrono>
#include <functional>

#include <tateyama/api/server/request.h>

namespace tateyama::endpoint::common {

class listener_common {
public:
using callback = std::function<void(const std::shared_ptr<tateyama::api::server::request>&, std::chrono::system_clock::time_point)>;

/**
* @brief create empty object
*/
listener_common() = default;

/**
* @brief destruct the object
*/
virtual ~listener_common() = default;

listener_common(listener_common const& other) = default;
listener_common& operator=(listener_common const& other) = default;
listener_common(listener_common&& other) noexcept = default;
listener_common& operator=(listener_common&& other) noexcept = default;

/**
* @brief body of the process executed by the listener thread
*/
virtual void operator()() = 0;

/**
* @brief notify startup barrier of the listener bocomes ready
*/
virtual void arrive_and_wait() = 0;

/**
* @brief terminate the listener
*/
virtual void terminate() = 0;

/**
* @brief print diagnostics
* @param os the output stream
*/
virtual void print_diagnostic(std::ostream& os) = 0;

/**
* @brief apply callback function to ongoing request in the workers belonging to the listener
* @param func the callback function that receives session_id and request
*/
virtual void foreach_request(const callback& func) = 0;
};

} // tateyama::endpoint::common
13 changes: 11 additions & 2 deletions src/tateyama/endpoint/common/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
#pragma once

#include <chrono>

#include <tateyama/api/server/request.h>

#include "tateyama/status/resource/database_info_impl.h"
Expand All @@ -31,7 +33,8 @@ class request : public tateyama::api::server::request {
tateyama::api::server::session_store& session_store,
tateyama::session::session_variable_set& session_variable_set,
std::size_t local_id) :
database_info_(database_info), session_info_(session_info), session_store_(session_store), session_variable_set_(session_variable_set), local_id_(local_id) {
database_info_(database_info), session_info_(session_info), session_store_(session_store), session_variable_set_(session_variable_set),
local_id_(local_id), start_at_(std::chrono::system_clock::now()) {
}

[[nodiscard]] tateyama::api::server::database_info const& database_info() const noexcept override {
Expand All @@ -54,6 +57,10 @@ class request : public tateyama::api::server::request {
return local_id_;
}

[[nodiscard]] std::chrono::system_clock::time_point start_at() const noexcept {
return start_at_;
}

protected:
const tateyama::api::server::database_info& database_info_; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes)

Expand All @@ -64,7 +71,9 @@ class request : public tateyama::api::server::request {
tateyama::session::session_variable_set& session_variable_set_; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes)

private:
std::size_t local_id_{};
std::size_t local_id_;

std::chrono::system_clock::time_point start_at_;
};

} // tateyama::endpoint::common
27 changes: 24 additions & 3 deletions src/tateyama/endpoint/common/worker_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include "request.h"
#include "response.h"
#include "listener_common.h"
#include "session_info_impl.h"

namespace tateyama::endpoint::common {
Expand Down Expand Up @@ -174,6 +175,26 @@ class worker_common {
}
}

/**
* @brief apply callback function to ongoing request
* @param func the callback function
*/
void foreach_request(const listener_common::callback& func) {
std::vector<std::shared_ptr<tateyama::endpoint::common::request>> targets{};
{
std::lock_guard<std::mutex> lock(mtx_reqreses_);
for (auto itr{reqreses_.begin()}, end{reqreses_.end()}; itr != end; itr++) {
auto& ptr = itr->second.first;
if (ptr.use_count() > 1) {
targets.emplace_back(ptr);
}
}
}
for (auto&& e: targets) {
func(std::dynamic_pointer_cast<tateyama::api::server::request>(e), e->start_at());
}
}

protected:
const connection_type connection_type_; // NOLINT
const std::size_t session_id_; // NOLINT
Expand Down Expand Up @@ -413,12 +434,12 @@ class worker_common {
}
}

void register_reqres(std::size_t slot, const std::shared_ptr<tateyama::api::server::request>& request, const std::shared_ptr<tateyama::endpoint::common::response>& response) noexcept {
void register_reqres(std::size_t slot, const std::shared_ptr<tateyama::endpoint::common::request>& request, const std::shared_ptr<tateyama::endpoint::common::response>& response) noexcept {
std::lock_guard<std::mutex> lock(mtx_reqreses_);
if (auto itr = reqreses_.find(slot); itr != reqreses_.end()) {
reqreses_.erase(itr);
}
reqreses_.emplace(slot, std::pair<std::shared_ptr<tateyama::api::server::request>, std::shared_ptr<tateyama::endpoint::common::response>>(request, response));
reqreses_.emplace(slot, std::pair<std::shared_ptr<tateyama::endpoint::common::request>, std::shared_ptr<tateyama::endpoint::common::response>>(request, response));
}

void remove_reqres(std::size_t slot) noexcept {
Expand Down Expand Up @@ -534,7 +555,7 @@ class worker_common {
bool enable_timeout_;
std::chrono::seconds refresh_timeout_;
std::chrono::seconds max_refresh_timeout_;
std::map<std::size_t, std::pair<std::shared_ptr<tateyama::api::server::request>, std::shared_ptr<tateyama::endpoint::common::response>>> reqreses_{};
std::map<std::size_t, std::pair<std::shared_ptr<tateyama::endpoint::common::request>, std::shared_ptr<tateyama::endpoint::common::response>>> reqreses_{};
std::mutex mtx_reqreses_{};
std::vector<std::shared_ptr<tateyama::api::server::response>> shutdown_response_{};
bool cancel_requested_to_all_responses_{};
Expand Down
12 changes: 9 additions & 3 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_endpoint.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,9 +15,13 @@
*/
#pragma once

#include <memory>

#include <tateyama/api/configuration.h>
#include <tateyama/framework/environment.h>
#include <tateyama/framework/endpoint.h>
#include <tateyama/status/resource/bridge.h>
#include <tateyama/request/service/bridge.h>
#include <tateyama/diagnostic/resource/diagnostic_resource.h>

#include "ipc_listener.h"
Expand Down Expand Up @@ -56,7 +60,7 @@ class ipc_endpoint : public endpoint {
bool setup(environment& env) override {
try {
// create listener object
listener_ = std::make_unique<tateyama::endpoint::ipc::bootstrap::ipc_listener>(env);
listener_ = std::make_shared<tateyama::endpoint::ipc::bootstrap::ipc_listener>(env);
return true;
} catch (std::exception &ex) {
LOG_LP(ERROR) << ex.what();
Expand All @@ -70,6 +74,8 @@ class ipc_endpoint : public endpoint {
bool start(environment& env) override {
listener_thread_ = std::thread(std::ref(*listener_));
listener_->arrive_and_wait();
auto request_service = env.service_repository().find<tateyama::request::service::bridge>();
request_service->register_endpoint_listener(listener_);
auto diagnostic_resource = env.resource_repository().find<tateyama::diagnostic::resource::diagnostic_resource>();
diagnostic_resource->add_print_callback("tateyama_ipc_endpoint", [this](std::ostream& os) {
listener_->print_diagnostic(os);
Expand Down Expand Up @@ -102,7 +108,7 @@ class ipc_endpoint : public endpoint {
}

private:
std::unique_ptr<tateyama::endpoint::ipc::bootstrap::ipc_listener> listener_;
std::shared_ptr<tateyama::endpoint::ipc::bootstrap::ipc_listener> listener_;
std::thread listener_thread_;
};

Expand Down
29 changes: 19 additions & 10 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <tateyama/api/configuration.h>
#include <tateyama/session/resource/bridge.h>

#include "tateyama/endpoint/common/listener_common.h"
#include "tateyama/endpoint/common/logging.h"
#include "tateyama/endpoint/common/pointer_comp.h"
#include "tateyama/endpoint/ipc/metrics/ipc_metrics.h"
Expand All @@ -45,7 +46,7 @@ namespace tateyama::endpoint::ipc::bootstrap {
/**
* @brief ipc listener
*/
class ipc_listener {
class ipc_listener : public tateyama::endpoint::common::listener_common {
public:
explicit ipc_listener(tateyama::framework::environment& env)
: cfg_(env.configuration()),
Expand Down Expand Up @@ -176,7 +177,7 @@ class ipc_listener {
}
}

void operator()() {
void operator()() override {
auto& connection_queue = container_->get_connection_queue();
proc_mutex_file_ = status_->mutex_file();
arrive_and_wait();
Expand Down Expand Up @@ -241,18 +242,15 @@ class ipc_listener {
confirm_workers_termination();
}

void terminate() {
container_->get_connection_queue().request_terminate();
void arrive_and_wait() override {
sync.wait();
}

void arrive_and_wait() {
sync.wait();
void terminate() override {
container_->get_connection_queue().request_terminate();
}

/**
* @brief print diagnostics
*/
void print_diagnostic(std::ostream& os) {
void print_diagnostic(std::ostream& os) override {
os << "/:tateyama:ipc_endpoint print diagnostics start" << std::endl;

std::unique_lock<std::mutex> lock(mtx_workers_);
Expand All @@ -270,6 +268,17 @@ class ipc_listener {
os << "/:tateyama:ipc_endpoint print diagnostics end" << std::endl;
}

void foreach_request(const callback& func) override {
std::unique_lock<std::mutex> lock(mtx_workers_);
for (auto && e : workers_) {
if (std::shared_ptr<ipc_worker> worker = e; worker) {
if (!worker->is_terminated()) {
worker->foreach_request(func);
}
}
}
}

private:
const std::shared_ptr<api::configuration::whole> cfg_;
const std::shared_ptr<framework::routing_service> router_;
Expand Down
4 changes: 2 additions & 2 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void ipc_worker::run() { // NOLINT(readability-function-cognitive-complexity)
{
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), writer_count_, [this, index](){remove_reqres(index);});
register_reqres(index,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if (routing_service_chain(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
Expand All @@ -124,7 +124,7 @@ void ipc_worker::run() { // NOLINT(readability-function-cognitive-complexity)
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), writer_count_, [this, index](){remove_reqres(index);});
if (!check_shutdown_request()) {
register_reqres(index,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if (!service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
Expand Down
21 changes: 16 additions & 5 deletions src/tateyama/endpoint/stream/bootstrap/stream_endpoint.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,9 +15,14 @@
*/
#pragma once

#include <memory>

#include <tateyama/api/configuration.h>
#include <tateyama/framework/endpoint.h>
#include <tateyama/framework/environment.h>
#include <tateyama/framework/endpoint.h>
#include <tateyama/request/service/bridge.h>
#include <tateyama/diagnostic/resource/diagnostic_resource.h>

#include "stream_listener.h"

namespace tateyama::framework {
Expand Down Expand Up @@ -58,7 +63,7 @@ class stream_endpoint : public endpoint {
}
if (enabled_) {
// create listener object
listener_ = std::make_unique<tateyama::endpoint::stream::bootstrap::stream_listener>(env);
listener_ = std::make_shared<tateyama::endpoint::stream::bootstrap::stream_listener>(env);
}
return true;
} catch (std::exception &ex) {
Expand All @@ -70,10 +75,16 @@ class stream_endpoint : public endpoint {
/**
* @brief start the component (the state will be `activated`)
*/
bool start(environment&) override {
bool start(environment& env) override {
if (enabled_) {
listener_thread_ = std::thread(std::ref(*listener_));
listener_->arrive_and_wait();
auto request_service = env.service_repository().find<tateyama::request::service::bridge>();
request_service->register_endpoint_listener(listener_);
auto diagnostic_resource = env.resource_repository().find<tateyama::diagnostic::resource::diagnostic_resource>();
diagnostic_resource->add_print_callback("tateyama_stream_endpoint", [this](std::ostream& os) {
listener_->print_diagnostic(os);
});
}
return true;
}
Expand Down Expand Up @@ -103,7 +114,7 @@ class stream_endpoint : public endpoint {
}

private:
std::unique_ptr<tateyama::endpoint::stream::bootstrap::stream_listener> listener_;
std::shared_ptr<tateyama::endpoint::stream::bootstrap::stream_listener> listener_;
std::thread listener_thread_;
bool enabled_{true};
};
Expand Down
Loading

0 comments on commit 6c915d3

Please sign in to comment.