Skip to content

Commit

Permalink
add session_test for IPC/TCP and make it pass
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Mar 14, 2024
1 parent dac20d5 commit 78ee4e9
Show file tree
Hide file tree
Showing 16 changed files with 389 additions and 165 deletions.
70 changes: 51 additions & 19 deletions src/tateyama/endpoint/common/worker_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,24 @@
namespace tateyama::endpoint::common {

class worker_common {
public:
protected:
enum class shutdown_consequence : std::uint32_t {
/**
* @brief no shutdown, continue normal processing.
*/
keep_working = 0U,

/**
* @brief postpone the shutdown.
*/
postpone,

/**
* @brief shutdown immediately.
*/
immediate,
};

enum class connection_type : std::uint32_t {
/**
* @brief undefined type.
Expand All @@ -61,6 +78,7 @@ class worker_common {
stream,
};

public:
worker_common(connection_type con, std::size_t session_id, std::string_view conn_info, std::shared_ptr<tateyama::session::resource::bridge> session)
: connection_type_(con),
session_id_(session_id),
Expand Down Expand Up @@ -117,7 +135,7 @@ class worker_common {

// for session management
const std::shared_ptr<tateyama::session::resource::bridge> session_; // NOLINT
bool cancel_requested_to_all_responses{}; // NOLINT
bool cancel_requested_to_all_responses_{}; // NOLINT

bool handshake(tateyama::api::server::request* req, tateyama::api::server::response* res) {
if (req->service_id() != tateyama::framework::service_id_endpoint_broker) {
Expand Down Expand Up @@ -251,30 +269,44 @@ class worker_common {
responses_.erase(slot);
}

bool handle_shutdown() {
if (auto type = session_context_->shutdown_request(); type != tateyama::session::shutdown_request_type::nothing) {
if (type == tateyama::session::shutdown_request_type::forceful && !cancel_requested_to_all_responses) {
foreach_response([](tateyama::endpoint::common::response& e){e.cancel();});
cancel_requested_to_all_responses = true;
shutdown_consequence handle_shutdown() {
switch (session_context_->shutdown_request()) {
case tateyama::session::shutdown_request_type::graceful:
if (has_incomplete_response()) {
return shutdown_consequence::postpone;
}
return shutdown_consequence::immediate;
case tateyama::session::shutdown_request_type::forceful:
if (!cancel_requested_to_all_responses_) {
foreach_response([this](tateyama::endpoint::common::response& e){
e.cancel();
notify_client(&e, tateyama::proto::diagnostics::Code::OPERATION_CANCELED, "");
});
cancel_requested_to_all_responses_ = true;
}
return true;
return shutdown_consequence::immediate;
default:
return shutdown_consequence::keep_working;
}
return false;
}

bool foreach_response(const std::function<void(tateyama::endpoint::common::response&)>& func) {
bool rv{false};
std::lock_guard<std::mutex> lock(mtx_responses_);
for (auto it{responses_.begin()}, end{responses_.end()}; it != end; ) {
if (auto r = it->second.lock(); r) {
func(*r);
rv = true;
++it;
} else {
it = responses_.erase(it);
std::vector<std::shared_ptr<tateyama::endpoint::common::response>> targets{};
{
std::lock_guard<std::mutex> lock(mtx_responses_);
for (auto it{responses_.begin()}, end{responses_.end()}; it != end; ) {
if (auto r = it->second.lock(); r) {
targets.emplace_back(r);
++it;
} else {
it = responses_.erase(it);
}
}
}
return rv;
for (auto &&e: targets) {
func(*e);
}
return !targets.empty();
}

bool shutdown_request(tateyama::session::shutdown_request_type type) noexcept {
Expand Down
8 changes: 4 additions & 4 deletions src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ class server_wire_container_impl : public server_wire_container
std::lock_guard<std::mutex> lock(mtx_);
wire_->write(bip_buffer_, from, header);
}
void set_shutdown() {
wire_->set_shutdown();
void notify_shutdown() override {
wire_->notify_shutdown();
}

// for client
Expand Down Expand Up @@ -555,9 +555,9 @@ class server_wire_container_impl : public server_wire_container
return garbage_collector_impl_.get();
}

void notify() {
void notify_shutdown() {
request_wire_.notify();
response_wire_.set_shutdown();
response_wire_.notify_shutdown();
}

[[nodiscard]] bool terminate_requested() const {
Expand Down
31 changes: 23 additions & 8 deletions src/tateyama/endpoint/ipc/bootstrap/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ void Worker::run() {
while(true) {
auto hdr = request_wire_container_->peep();
if (hdr.get_length() == 0 && hdr.get_idx() == tateyama::common::wire::message_header::null_request) {
if (handle_shutdown() == shutdown_consequence::immediate) {
VLOG_LP(log_trace) << "received and completed shutdown request: session_id = " << std::to_string(session_id_);
return;
}
if (request_wire_container_->terminate_requested()) {
VLOG_LP(log_trace) << "received shutdown request: session_id = " << std::to_string(session_id_);
return;
Expand All @@ -52,7 +56,8 @@ void Worker::run() {
try {
auto h = request_wire_container_->peep();
if (h.get_length() == 0 && h.get_idx() == tateyama::common::wire::message_header::null_request) {
if (handle_shutdown()) {
if (handle_shutdown() == shutdown_consequence::immediate) {
wire_->get_response_wire().notify_shutdown();
VLOG_LP(log_trace) << "received and completed shutdown request: session_id = " << std::to_string(session_id_);
break;
}
Expand All @@ -65,19 +70,26 @@ void Worker::run() {
auto request = std::make_shared<ipc_request>(*wire_, h, database_info_, session_info_);
std::size_t index = h.get_idx();
auto response = std::make_shared<ipc_response>(wire_, h.get_idx(), [this, index](){remove_response(index);});
bool break_while{false};
if (request->service_id() != tateyama::framework::service_id_endpoint_broker) {
if (!handle_shutdown()) {
switch (handle_shutdown()) {
case shutdown_consequence::keep_working:
register_response(index, static_cast<std::shared_ptr<tateyama::endpoint::common::response>>(response));
if (!service_(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;
break_while = true;
}
} else {
break;
case shutdown_consequence::postpone:
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
if (!has_incomplete_response()) {
break;
}
break;
case shutdown_consequence::immediate:
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
wire_->get_response_wire().notify_shutdown();
break_while = true;
VLOG_LP(log_trace) << "received and completed shutdown request: session_id = " << std::to_string(session_id_);
break;
}
} else {
if (!endpoint_service(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
Expand All @@ -87,6 +99,9 @@ void Worker::run() {
break;
}
}
if (break_while) {
break;
}
request->dispose();
request = nullptr;
wire_->get_garbage_collector()->dump();
Expand All @@ -106,7 +121,7 @@ bool Worker::terminate(tateyama::session::shutdown_request_type type) {
VLOG_LP(log_trace) << "send terminate request: session_id = " << std::to_string(session_id_);

auto rv = shutdown_request(type);
wire_->notify();
wire_->notify_shutdown();
return rv;
}

Expand Down
1 change: 1 addition & 0 deletions src/tateyama/endpoint/ipc/server_wires.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class server_wire_container
response_wire_container& operator = (response_wire_container&&) = default;

virtual void write(const char*, tateyama::common::wire::response_header) = 0;
virtual void notify_shutdown() = 0;
};
class resultset_wire_container;
using resultset_wire_deleter_type = void(*)(resultset_wire_container*);
Expand Down
6 changes: 3 additions & 3 deletions src/tateyama/endpoint/ipc/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ inline static std::int64_t n_cap(std::int64_t timeout) {

// for request
class unidirectional_message_wire : public simple_wire<message_header> {
constexpr static std::size_t watch_interval = 5;
constexpr static std::size_t watch_interval = 2;
public:
unidirectional_message_wire(boost::interprocess::managed_shared_memory* managed_shm_ptr, std::size_t capacity) : simple_wire<message_header>(managed_shm_ptr, capacity) {}

Expand Down Expand Up @@ -497,7 +497,7 @@ class unidirectional_response_wire : public simple_wire<response_header> {
[[nodiscard]] response_header::msg_type get_type() const {
return header_received_.get_type();
}
void set_shutdown() noexcept {
void notify_shutdown() noexcept {
shutdown_.store(true);
}

Expand All @@ -509,7 +509,7 @@ class unidirectional_response_wire : public simple_wire<response_header> {
c_empty_.notify_one();
}
}
[[nodiscard]] bool get_shutdown() const noexcept {
[[nodiscard]] bool check_shutdown() const noexcept {
return shutdown_.load();
}

Expand Down
34 changes: 20 additions & 14 deletions src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ void stream_worker::run()
session_stream_->close();
return;
}

session_stream_->change_slot_size(max_result_sets_);
break;
}
Expand All @@ -77,24 +78,28 @@ void stream_worker::run()
std::string payload{};

switch (session_stream_->await(slot, payload)) {

case tateyama::endpoint::stream::stream_socket::await_result::payload:
{
auto request = std::make_shared<stream_request>(*session_stream_, payload, database_info_, session_info_);
auto response = std::make_shared<stream_response>(session_stream_, slot, [this, slot](){remove_response(slot);});
bool break_while{false};
if (request->service_id() != tateyama::framework::service_id_endpoint_broker) {
if (!handle_shutdown()) {
switch (handle_shutdown()) {
case shutdown_consequence::keep_working:
register_response(slot, static_cast<std::shared_ptr<tateyama::endpoint::common::response>>(response));
if(!service_(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;
break_while = true;
}
} else {
break;
case shutdown_consequence::postpone:
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
if (!has_incomplete_response()) {
break;
}
break;
case shutdown_consequence::immediate:
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
break_while = true;
break;
}
} else {
if (!endpoint_service(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
Expand All @@ -105,25 +110,26 @@ void stream_worker::run()
}
}
request = nullptr;
continue;
if (!break_while) {
continue;
}
break;
}

case tateyama::endpoint::stream::stream_socket::await_result::timeout:
{
if (handle_shutdown()) {
if (handle_shutdown() == shutdown_consequence::immediate) {
VLOG_LP(log_trace) << "received and completed shutdown request: session_id = " << std::to_string(session_id_);
break;
}
continue;
}

default:
session_stream_->close();
default: // some error
break;
}

break;
}
session_stream_->close();

#ifdef ENABLE_ALTIMETER
tateyama::endpoint::altimeter::session_end(database_info_, session_info_);
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class stream_socket
private:
int socket_;
static constexpr std::size_t N_FDS = 1;
static constexpr int TIMEOUT_MS = 1000; // 1000(mS)
static constexpr int TIMEOUT_MS = 2000; // 2000(mS)
struct pollfd fds_[N_FDS]{}; // NOLINT

bool session_closed_{false};
Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/session/resource/bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ std::optional<tateyama::proto::session::diagnostic::ErrorCode> bridge::get(std::
return tateyama::proto::session::diagnostic::ErrorCode::SESSION_NOT_FOUND;
}

std::optional<tateyama::proto::session::diagnostic::ErrorCode> bridge::shutdown(std::string_view session_specifier, [[maybe_unused]] shutdown_request_type type) {
std::optional<tateyama::proto::session::diagnostic::ErrorCode> bridge::shutdown(std::string_view session_specifier, shutdown_request_type type) {
session_context::numeric_id_type numeric_id{};
try {
auto opt = find_only_one_session(session_specifier, numeric_id);
Expand Down
21 changes: 9 additions & 12 deletions test/tateyama/endpoint/ipc/ipc_client.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 Project Tsurugi.
* Copyright 2018-2024 Project Tsurugi.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,8 +15,6 @@
*/
#include "ipc_client.h"

#include <gtest/gtest.h>

namespace tateyama::endpoint::ipc {

ipc_client::ipc_client(std::shared_ptr<tateyama::api::configuration::whole> const &cfg, tateyama::proto::endpoint::request::Handshake& hs)
Expand Down Expand Up @@ -45,7 +43,7 @@ ipc_client::ipc_client(std::string_view name, std::size_t session_id, tateyama::
handshake();
}

static constexpr tsubakuro::common::wire::message_header::index_type ipc_test_index = 1234;
static constexpr tsubakuro::common::wire::message_header::index_type ipc_test_index = 135;

void ipc_client::send(const std::size_t tag, const std::string &message) {
request_header_content hdr { session_id_, tag };
Expand Down Expand Up @@ -78,13 +76,11 @@ bool parse_response_header(std::string_view input, parse_response_result &result
}

void ipc_client::receive(std::string &message) {
receive(message, static_cast<tateyama::proto::framework::response::Header::PayloadType>(0), false);
}
void ipc_client::receive(std::string &message, tateyama::proto::framework::response::Header::PayloadType type) {
receive(message, type, true);
tateyama::proto::framework::response::Header::PayloadType type{};
receive(message, type);
}

void ipc_client::receive(std::string &message, tateyama::proto::framework::response::Header::PayloadType type, bool do_check) {
void ipc_client::receive(std::string &message, tateyama::proto::framework::response::Header::PayloadType &type) {
tsubakuro::common::wire::response_header header;
int ntry = 0;
bool ok = false;
Expand All @@ -94,6 +90,9 @@ void ipc_client::receive(std::string &message, tateyama::proto::framework::respo
header = response_wire_->await();
ok = true;
} catch (const std::runtime_error &ex) {
if (response_wire_->check_shutdown()) {
throw std::runtime_error("server shutdown");
}
std::cout << ex.what() << std::endl;
ntry++;
if (ntry >= 100) {
Expand All @@ -109,12 +108,10 @@ void ipc_client::receive(std::string &message, tateyama::proto::framework::respo
//
parse_response_result result;
parse_response_header(r_msg, result);
if (do_check) {
EXPECT_EQ(type, result.payload_type_);
}
// ASSERT_TRUE(parse_response_header(r_msg, result));
// EXPECT_EQ(session_id_, result.session_id_);
message = result.payload_;
type = result.payload_type_;
}

resultset_wires_container* ipc_client::create_resultset_wires() {
Expand Down
Loading

0 comments on commit 78ee4e9

Please sign in to comment.