Skip to content

Commit

Permalink
revise ipc_data_channel termination process
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Nov 27, 2024
1 parent 2fb8519 commit 706bc9c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 39 deletions.
20 changes: 9 additions & 11 deletions src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

#include "tateyama/endpoint/ipc/wire.h"
#include "tateyama/endpoint/ipc/server_wires.h"
#include "tateyama/endpoint/ipc/ipc_response.h"

namespace tateyama::endpoint::ipc::bootstrap {

Expand Down Expand Up @@ -404,7 +403,7 @@ class server_wire_container_impl : public server_wire_container
garbage_collector_impl() = default;
~garbage_collector_impl() override {
std::lock_guard<std::mutex> lock(mtx_put_);
data_channel_set_.clear();
resultset_wires_set_.clear();
}

/**
Expand All @@ -415,19 +414,18 @@ class server_wire_container_impl : public server_wire_container
garbage_collector_impl& operator = (garbage_collector_impl const&) = delete;
garbage_collector_impl& operator = (garbage_collector_impl&&) = delete;

void put(std::shared_ptr<ipc_data_channel> ch) override {
void put(unq_p_resultset_wires_conteiner wires) override {
std::lock_guard<std::mutex> lock(mtx_put_);
data_channel_set_.emplace(std::move(ch));
resultset_wires_set_.emplace(std::move(wires));
}
void dump() override {
if (mtx_dump_.try_lock()) {
std::lock_guard<std::mutex> lock(mtx_put_);

auto it = data_channel_set_.begin();
while (it != data_channel_set_.end()) {
auto* wc = (*it)->resultset_wires_conteiner();
if (wc->is_closed() && wc->is_disposable()) {
data_channel_set_.erase(it++);
auto it = resultset_wires_set_.begin();
while (it != resultset_wires_set_.end()) {
if ((*it)->is_closed() && (*it)->is_disposable()) {
resultset_wires_set_.erase(it++);
} else {
it++;
}
Expand All @@ -437,11 +435,11 @@ class server_wire_container_impl : public server_wire_container
}
bool empty() override {
std::lock_guard<std::mutex> lock(mtx_put_);
return data_channel_set_.empty();
return resultset_wires_set_.empty();
}

private:
std::set<std::shared_ptr<ipc_data_channel>> data_channel_set_{};
std::set<unq_p_resultset_wires_conteiner> resultset_wires_set_{};
std::mutex mtx_put_{};
std::mutex mtx_dump_{};
};
Expand Down
34 changes: 22 additions & 12 deletions src/tateyama/endpoint/ipc/ipc_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ tateyama::status ipc_response::body(std::string_view body) {
bool expected = false;
if (completed_.compare_exchange_strong(expected, true)) {
VLOG_LP(log_trace) << static_cast<const void*>(server_wire_.get()) << " length = " << body.length() << " slot = " << index_; //NOLINT
if (data_channel_) {
std::dynamic_pointer_cast<ipc_data_channel>(data_channel_)->shutdown(); // Guard against improper operation
}
clean_up_();

std::stringstream ss{};
Expand Down Expand Up @@ -71,6 +74,9 @@ void ipc_response::error(proto::diagnostics::Record const& record) {
bool expected = false;
if (completed_.compare_exchange_strong(expected, true)) {
VLOG_LP(log_trace) << static_cast<const void*>(server_wire_.get()) << " slot = " << index_; //NOLINT
if (data_channel_) {
std::dynamic_pointer_cast<ipc_data_channel>(data_channel_)->shutdown(); // Guard against improper operation
}
clean_up_();

std::string s{};
Expand Down Expand Up @@ -103,7 +109,7 @@ tateyama::status ipc_response::acquire_channel(std::string_view name, std::share
return tateyama::status::unknown;
}
try {
data_channel_ = std::make_shared<ipc_data_channel>(server_wire_->create_resultset_wires(name, writer_count_));
data_channel_ = std::make_shared<ipc_data_channel>(server_wire_->create_resultset_wires(name, writer_count_), garbage_collector_);
} catch (std::exception &ex) {
ch = nullptr;

Expand All @@ -123,13 +129,8 @@ tateyama::status ipc_response::release_channel(tateyama::api::server::data_chann
VLOG_LP(log_trace) << static_cast<const void*>(server_wire_.get()) << " data_channel_ = " << static_cast<const void*>(data_channel_.get()); //NOLINT

if (data_channel_.get() == &ch) {
auto ipc_data_channel_ptr = std::dynamic_pointer_cast<ipc_data_channel>(data_channel_);
ipc_data_channel_ptr->set_eor();
ipc_data_channel_ptr->release();
if (!ipc_data_channel_ptr->is_closed()) {
garbage_collector_.put(std::move(ipc_data_channel_ptr));
}
data_channel_ = nullptr; // Seems unnecessary, but do it just in case.
std::dynamic_pointer_cast<ipc_data_channel>(data_channel_)->shutdown();
data_channel_ = nullptr;
return tateyama::status::ok;
}
LOG_LP(ERROR) << "the parameter given (tateyama::api::server::data_channel& ch) does not match the data_channel_ this object has";
Expand Down Expand Up @@ -171,11 +172,20 @@ tateyama::status ipc_data_channel::release(tateyama::api::server::writer& wrt) {
return tateyama::status::unknown;
}

void ipc_data_channel::release() {
void ipc_data_channel::shutdown() {
std::unique_lock lock{mutex_};
for (auto&& it = data_writers_.begin(), last = data_writers_.end(); it != last;) {
(*it)->release();
it = data_writers_.erase(it);

if (resultset_wires_) {
resultset_wires_->set_eor();
for (auto&& it = data_writers_.begin(), last = data_writers_.end(); it != last;) {
(*it)->release();
it = data_writers_.erase(it);
}
if (!resultset_wires_->is_closed()) {
garbage_collector_.put(std::move(resultset_wires_));
} else {
resultset_wires_ = nullptr;
}
}
}

Expand Down
18 changes: 7 additions & 11 deletions src/tateyama/endpoint/ipc/ipc_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ class alignas(64) ipc_writer : public tateyama::api::server::writer {

tateyama::status write(char const* data, std::size_t length) override;
tateyama::status commit() override;
void release();

private:
server_wire_container::unq_p_resultset_wire_conteiner resultset_wire_;
std::atomic_bool released_{};

void release();
};

/**
Expand All @@ -60,12 +61,11 @@ class alignas(64) ipc_data_channel : public tateyama::api::server::data_channel
friend ipc_response;

public:
explicit ipc_data_channel(server_wire_container::unq_p_resultset_wires_conteiner resultset_wires)
: resultset_wires_(std::move(resultset_wires)) {
explicit ipc_data_channel(server_wire_container::unq_p_resultset_wires_conteiner resultset_wires, garbage_collector& gc)
: resultset_wires_(std::move(resultset_wires)), garbage_collector_(gc) {
}
~ipc_data_channel() override {
std::unique_lock lock{mutex_};
resultset_wires_ = nullptr;
shutdown();
}

ipc_data_channel(ipc_data_channel const&) = delete;
Expand All @@ -75,19 +75,15 @@ class alignas(64) ipc_data_channel : public tateyama::api::server::data_channel

tateyama::status acquire(std::shared_ptr<tateyama::api::server::writer>& wrt) override;
tateyama::status release(tateyama::api::server::writer& wrt) override;
void set_eor() { return resultset_wires_->set_eor(); }
bool is_closed() { return resultset_wires_->is_closed(); }
server_wire_container::unq_p_resultset_wires_conteiner::pointer resultset_wires_conteiner() {
return resultset_wires_.get();
}

private:
server_wire_container::unq_p_resultset_wires_conteiner resultset_wires_;
garbage_collector& garbage_collector_;

std::set<std::shared_ptr<ipc_writer>, tateyama::endpoint::common::pointer_comp<ipc_writer>> data_writers_{};
std::mutex mutex_{};

void release();
void shutdown();
};

/**
Expand Down
4 changes: 1 addition & 3 deletions src/tateyama/endpoint/ipc/server_wires.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

namespace tateyama::endpoint::ipc {

class ipc_data_channel;

class server_wire_container
{
public:
Expand Down Expand Up @@ -98,7 +96,7 @@ class server_wire_container
* @brief try to dispose remaining resultset_wire
* @returns true if garbage_collector has no remaining resultset_wire.
*/
virtual void put(std::shared_ptr<ipc_data_channel>) = 0;
virtual void put(unq_p_resultset_wires_conteiner) = 0;
virtual void dump() = 0;
virtual bool empty() = 0;
};
Expand Down
4 changes: 2 additions & 2 deletions test/tateyama/endpoint/result_set_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class result_set_test : public ::testing::Test {
w->write(r_.data(), r_.length());
w->commit();

res->body(response_test_message_);
EXPECT_EQ(dc->release(*w), tateyama::status::ok);
EXPECT_EQ(res->release_channel(*dc), tateyama::status::ok);
res->body(response_test_message_);

return 0;
}
Expand Down Expand Up @@ -180,9 +180,9 @@ TEST_F(result_set_test, large) {
w->commit();
}

res->body(response_test_message_);
EXPECT_EQ(dc->release(*w), tateyama::status::ok);
EXPECT_EQ(res->release_channel(*dc), tateyama::status::ok);
res->body(response_test_message_);


// client side
Expand Down

0 comments on commit 706bc9c

Please sign in to comment.