Skip to content

Commit

Permalink
protect resultset_wires in ipc_data_channel with mutex and some refac…
Browse files Browse the repository at this point in the history
…toring
  • Loading branch information
t-horikawa committed Nov 4, 2024
1 parent c36b229 commit 4341a51
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
12 changes: 6 additions & 6 deletions src/tateyama/endpoint/ipc/ipc_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ tateyama::status ipc_response::release_channel(tateyama::api::server::data_chann
VLOG_LP(log_trace) << static_cast<const void*>(server_wire_.get()) << " data_channel_ = " << static_cast<const void*>(data_channel_.get()); //NOLINT

if (data_channel_.get() == &ch) {
auto *data_channel = dynamic_cast<ipc_data_channel*>(data_channel_.get());
data_channel->set_eor();
data_channel->release();
if (!data_channel->is_closed()) {
garbage_collector_->put(data_channel->get_resultset_wires());
auto *ipc_data_channel_ptr = dynamic_cast<ipc_data_channel*>(data_channel_.get());
ipc_data_channel_ptr->set_eor();
ipc_data_channel_ptr->release();
if (!ipc_data_channel_ptr->is_closed()) {
ipc_data_channel_ptr->defer_resultset_delete(garbage_collector_);
}
data_channel_ = nullptr;
return tateyama::status::ok;
Expand All @@ -137,7 +137,7 @@ tateyama::status ipc_response::release_channel(tateyama::api::server::data_chann
// class ipc_data_channel
tateyama::status ipc_data_channel::acquire(std::shared_ptr<tateyama::api::server::writer>& wrt) {
try {
if (auto ipc_wrt = std::make_shared<ipc_writer>(data_channel_->acquire()); ipc_wrt != nullptr) {
if (auto ipc_wrt = std::make_shared<ipc_writer>(resultset_wires_->acquire()); ipc_wrt != nullptr) {
wrt = ipc_wrt;
VLOG_LP(log_trace) << " data_channel_ = " << static_cast<const void*>(this) << " writer = " << static_cast<const void*>(wrt.get()); //NOLINT
{
Expand Down
30 changes: 22 additions & 8 deletions src/tateyama/endpoint/ipc/ipc_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,32 @@ class alignas(64) ipc_data_channel : public tateyama::api::server::data_channel
friend ipc_response;

public:
explicit ipc_data_channel(server_wire_container::unq_p_resultset_wires_conteiner data_channel)
: data_channel_(std::move(data_channel)) {
explicit ipc_data_channel(server_wire_container::unq_p_resultset_wires_conteiner resultset_wires)
: resultset_wires_(std::move(resultset_wires)) {
}
~ipc_data_channel() override {
std::unique_lock lock{mutex_};
resultset_wires_ = nullptr;
}

ipc_data_channel(ipc_data_channel const&) = delete;
ipc_data_channel(ipc_data_channel&&) = delete;
ipc_data_channel& operator = (ipc_data_channel const&) = delete;
ipc_data_channel& operator = (ipc_data_channel&&) = delete;

tateyama::status acquire(std::shared_ptr<tateyama::api::server::writer>& wrt) override;
tateyama::status release(tateyama::api::server::writer& wrt) override;
void set_eor() { return data_channel_->set_eor(); }
bool is_closed() { return data_channel_->is_closed(); }
server_wire_container::unq_p_resultset_wires_conteiner get_resultset_wires() { return std::move(data_channel_); }
void set_eor() { return resultset_wires_->set_eor(); }
bool is_closed() { return resultset_wires_->is_closed(); }
void defer_resultset_delete(garbage_collector& gc) {
std::unique_lock lock{mutex_};
if (resultset_wires_) {
gc.put(std::move(resultset_wires_));
}
}

private:
server_wire_container::unq_p_resultset_wires_conteiner data_channel_;
server_wire_container::unq_p_resultset_wires_conteiner resultset_wires_;

std::set<std::shared_ptr<ipc_writer>, tateyama::endpoint::common::pointer_comp<ipc_writer>> data_writers_{};
std::mutex mutex_{};
Expand All @@ -91,7 +105,7 @@ class alignas(64) ipc_response : public tateyama::endpoint::common::response {
ipc_response(std::shared_ptr<server_wire_container> server_wire, std::size_t index, std::size_t writer_count, std::function<void(void)> clean_up) :
tateyama::endpoint::common::response(index),
server_wire_(std::move(server_wire)),
garbage_collector_(server_wire_->get_garbage_collector()),
garbage_collector_(*server_wire_->get_garbage_collector()),
writer_count_(writer_count),
clean_up_(std::move(clean_up)) {
}
Expand All @@ -104,7 +118,7 @@ class alignas(64) ipc_response : public tateyama::endpoint::common::response {

private:
std::shared_ptr<server_wire_container> server_wire_;
garbage_collector* garbage_collector_;
garbage_collector& garbage_collector_;
std::size_t writer_count_;
const std::function<void(void)> clean_up_;

Expand Down

1 comment on commit 4341a51

@t-horikawa
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.