Skip to content

Commit

Permalink
make the operations on responses_ in endpoint::worker_common thread-safe
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Feb 27, 2024
1 parent 2135ca0 commit 8affb96
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
16 changes: 13 additions & 3 deletions src/tateyama/endpoint/common/worker_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <functional>
#include <map>
#include <vector>
#include <mutex>

#include <tateyama/status.h>
#include <tateyama/api/server/request.h>
Expand Down Expand Up @@ -198,25 +199,34 @@ class worker_common {
notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, ss.str());
return false;
}
if (auto itr = responses_.find(slot); itr != responses_.end()) {
if (auto ptr = itr->second.lock(); ptr) {
ptr->set_cancel();
{
std::lock_guard<std::mutex> lock(mtx_responses_);
if (auto itr = responses_.find(slot); itr != responses_.end()) {
if (auto ptr = itr->second.lock(); ptr) {
ptr->set_cancel();
}
}
}
return true;
}

void register_response(std::size_t slot, const std::shared_ptr<tateyama::endpoint::common::response>& response) noexcept {
std::lock_guard<std::mutex> lock(mtx_responses_);
if (auto itr = responses_.find(slot); itr != responses_.end()) {
responses_.erase(itr);
}
responses_.emplace(slot, response);
}
void remove_response(std::size_t slot) noexcept {
std::lock_guard<std::mutex> lock(mtx_responses_);
responses_.erase(slot);
}

private:
tateyama::session::session_variable_set session_variable_set_;
const std::shared_ptr<tateyama::session::resource::session_context_impl> session_context_;
std::map<std::size_t, std::weak_ptr<tateyama::endpoint::common::response>> responses_{};
std::mutex mtx_responses_{};

std::string_view connection_label(connection_type con) {
switch (con) {
Expand Down
4 changes: 0 additions & 4 deletions src/tateyama/endpoint/ipc/ipc_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ class alignas(64) ipc_response : public tateyama::endpoint::common::response {
session_id_ = id;
}

[[nodiscard]] bool check_cancel() const override {
return false;
}

private:
std::shared_ptr<server_wire_container> server_wire_;
std::size_t index_;
Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ class connection_socket
std::mutex num_mutex_{};
std::condition_variable num_condition_{};
std::size_t timeout_;

friend class stream_socket;

[[nodiscard]] bool is_socket_available() {
Expand Down
4 changes: 0 additions & 4 deletions src/tateyama/endpoint/stream/stream_response.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,6 @@ class alignas(64) stream_response : public tateyama::endpoint::common::response
session_id_ = id;
}

[[nodiscard]] bool check_cancel() const override {
return false;
}

private:
std::shared_ptr<stream_socket> stream_;
std::uint16_t index_;
Expand Down
10 changes: 5 additions & 5 deletions src/tateyama/proto/endpoint/request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ message Handshake {
WireInformation wire_information = 11;
}

// cancel operation.
message Cancel {
// no special properties.
}

// client information
message ClientInformation {
// the connection label.
Expand Down Expand Up @@ -81,3 +76,8 @@ message WireInformation {
uint64 maximum_concurrent_result_sets = 1;
}
}

// cancel operation.
message Cancel {
// no special properties.
}

0 comments on commit 8affb96

Please sign in to comment.