Skip to content

Commit

Permalink
ensure that no more than one response message is returned for a singl…
Browse files Browse the repository at this point in the history
…e slot
  • Loading branch information
t-horikawa committed May 30, 2024
1 parent e5b9d70 commit 62cf7a8
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 18 deletions.
4 changes: 4 additions & 0 deletions src/tateyama/endpoint/common/response.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class response : public tateyama::api::server::response {
return completed_.load();
}

void set_completed() noexcept {
completed_.store(true);
}

protected:
std::size_t index_; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes,misc-non-private-member-variables-in-classes)

Expand Down
3 changes: 2 additions & 1 deletion src/tateyama/endpoint/common/worker_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class worker_common {
}

bool endpoint_service(const std::shared_ptr<tateyama::api::server::request>& req,
[[maybe_unused]] const std::shared_ptr<tateyama::api::server::response>& res,
const std::shared_ptr<tateyama::endpoint::common::response>& res,
std::size_t slot) {
auto data = req->payload();
tateyama::proto::endpoint::request::Request rq{};
Expand All @@ -245,6 +245,7 @@ class worker_common {
case tateyama::proto::endpoint::request::Request::kCancel:
{
VLOG_LP(log_trace) << "received cancel request, slot = " << slot; //NOLINT
res->set_completed();
{
std::lock_guard<std::mutex> lock(mtx_reqreses_);
if (auto itr = reqreses_.find(slot); itr != reqreses_.end()) {
Expand Down
26 changes: 17 additions & 9 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,26 @@ void Worker::do_work() {

auto request = std::make_shared<ipc_request>(*wire_, hdr, database_info_, session_info_, session_store_);
std::size_t index = hdr.get_idx();
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [this, index](){remove_reqres(index);});
register_reqres(index,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
bool exit_frag = false;
switch (request->service_id()) {
case tateyama::framework::service_id_endpoint_broker:
{
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [](){});
// currently cancel request only
if (!endpoint_service(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response),
index)) {
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
exit_frag = true;
}
break;

}
case tateyama::framework::service_id_routing:
{
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [this, index](){remove_reqres(index);});
register_reqres(index,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if (routing_service_chain(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
index)) {
Expand All @@ -103,9 +107,14 @@ void Worker::do_work() {
exit_frag = true;
}
break;

}
default:
{
auto response = std::make_shared<ipc_response>(wire_, hdr.get_idx(), [this, index](){remove_reqres(index);});
if (!check_shutdown_request()) {
register_reqres(index,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if (!service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
Expand All @@ -115,14 +124,13 @@ void Worker::do_work() {
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "this session is already shutdown");
}
break;

}
}
if (exit_frag) {
break;
}
request->dispose();
request = nullptr;
response = nullptr;
wire_->get_garbage_collector()->dump();
} catch (std::runtime_error &e) {
LOG_LP(ERROR) << e.what();
Expand Down
25 changes: 17 additions & 8 deletions src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,25 @@ void stream_worker::do_work()
case tateyama::endpoint::stream::stream_socket::await_result::payload:
{
auto request = std::make_shared<stream_request>(*session_stream_, payload, database_info_, session_info_, session_store_);
auto response = std::make_shared<stream_response>(session_stream_, slot, [this, slot](){remove_reqres(slot);});
register_reqres(slot,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
switch (request->service_id()) {
case tateyama::framework::service_id_endpoint_broker:
{
auto response = std::make_shared<stream_response>(session_stream_, slot, [](){});
// currently cancel request only
if (endpoint_service(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response),
slot)) {
continue;
}
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
break;

}
case tateyama::framework::service_id_routing:
{
auto response = std::make_shared<stream_response>(session_stream_, slot, [this, slot](){remove_reqres(slot);});
register_reqres(slot,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if (routing_service_chain(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response),
slot)) {
Expand All @@ -107,9 +111,14 @@ void stream_worker::do_work()
}
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;

}
default:
{
auto response = std::make_shared<stream_response>(session_stream_, slot, [this, slot](){remove_reqres(slot);});
if (!check_shutdown_request()) {
register_reqres(slot,
std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::endpoint::common::response>(response));
if(service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
continue;
Expand All @@ -120,8 +129,8 @@ void stream_worker::do_work()
continue;
}
}
}
request = nullptr;
response = nullptr;
break;
}

Expand Down

0 comments on commit 62cf7a8

Please sign in to comment.