diff --git a/src/tateyama/transport/client_wire.h b/src/tateyama/transport/client_wire.h index 06970ad..0c87f8b 100644 --- a/src/tateyama/transport/client_wire.h +++ b/src/tateyama/transport/client_wire.h @@ -109,14 +109,14 @@ class session_wire_container public: wire_container() = default; wire_container(unidirectional_message_wire* wire, char* bip_buffer) noexcept : wire_(wire), bip_buffer_(bip_buffer) {}; - message_header peep(bool wait = false) { - return wire_->peep(bip_buffer_, wait); + message_header peep() { + return wire_->peep(bip_buffer_); } void write(const std::string& s, message_header::index_type index) { wire_->write(bip_buffer_, s.data(), message_header(index, s.length())); } void disconnect() { - wire_->write(bip_buffer_, nullptr, message_header(message_header::termination_request, 0)); + wire_->terminate(); } private: @@ -230,13 +230,15 @@ class connection_container } std::string connect() { - auto& q = get_connection_queue(); - auto id = q.request(); // connect - auto session_id = q.wait(id); // wait - std::string sn{db_name_}; - sn += "-"; - sn += std::to_string(session_id); - return sn; + auto& que = get_connection_queue(); + auto rid = que.request(); // connect + if (auto session_id = que.wait(rid); session_id != tateyama::common::wire::connection_queue::session_id_indicating_error) { // wait + std::string name{db_name_}; + name += "-"; + name += std::to_string(session_id); + return name; + } + throw std::runtime_error("IPC connection establishment failure"); } private: diff --git a/src/tateyama/transport/wire.h b/src/tateyama/transport/wire.h index 149721b..598391a 100644 --- a/src/tateyama/transport/wire.h +++ b/src/tateyama/transport/wire.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -42,7 +43,7 @@ class message_header { public: using length_type = std::uint32_t; using index_type = std::uint16_t; - static constexpr index_type termination_request = 0xffff; + static constexpr index_type terminate_request = 0xffff; static constexpr std::size_t size = sizeof(length_type) + sizeof(index_type); @@ -224,40 +225,6 @@ class simple_wire } } - /** - * @brief write response message in the response wire, which is used by endpoint IF - */ - void write(char* base, const char* from, T header) { - std::size_t length = header.get_length() + T::size; - auto msg_length = min(length, capacity_); - if (msg_length > room()) { wait_to_write(msg_length); } - write_in_buffer(base, buffer_address(base, pushed_.load()), header.get_buffer(), T::size); - if (msg_length > T::size) { - write_in_buffer(base, buffer_address(base, pushed_.load() + T::size), from, msg_length - T::size); - } - pushed_.fetch_add(msg_length); - length -= msg_length; - from += (msg_length - T::size); // NOLINT - std::atomic_thread_fence(std::memory_order_acq_rel); - if (wait_for_read_) { - boost::interprocess::scoped_lock lock(m_mutex_); - c_empty_.notify_one(); - } - while (length > 0) { - msg_length = min(length, capacity_); - if (msg_length > room()) { wait_to_write(msg_length); } - write_in_buffer(base, buffer_address(base, pushed_.load()), from, msg_length); - pushed_.fetch_add(msg_length); - length -= msg_length; - from += msg_length; // NOLINT - std::atomic_thread_fence(std::memory_order_acq_rel); - if (wait_for_read_) { - boost::interprocess::scoped_lock lock(m_mutex_); - c_empty_.notify_one(); - } - } - } - /** * @brief dispose the message in the queue at read_point that has completed read and is no longer needed * used by endpoint IF @@ -298,11 +265,43 @@ class simple_wire [[nodiscard]] const char* read_address(const char* base, std::size_t offset) const { return base + index(poped_.load() + offset); } //NOLINT [[nodiscard]] const char* read_address(const char* base) const { return base + index(poped_.load()); } //NOLINT - void wait_to_write(std::size_t length) { + void write(char* base, const char* from, T header, std::atomic_bool& closed) { + std::size_t length = header.get_length() + T::size; + auto msg_length = min(length, capacity_); + if (msg_length > room() && !closed.load()) { wait_to_write(msg_length, closed); } + if (closed.load()) { return; } + write_in_buffer(base, buffer_address(base, pushed_.load()), header.get_buffer(), T::size); + if (msg_length > T::size) { + write_in_buffer(base, buffer_address(base, pushed_.load() + T::size), from, msg_length - T::size); + } + pushed_.fetch_add(msg_length); + length -= msg_length; + from += (msg_length - T::size); // NOLINT + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_read_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_empty_.notify_one(); + } + while (length > 0) { + msg_length = min(length, capacity_); + if (msg_length > room() && !closed.load()) { wait_to_write(msg_length, closed); } + if (closed.load()) { return; } + write_in_buffer(base, buffer_address(base, pushed_.load()), from, msg_length); + pushed_.fetch_add(msg_length); + length -= msg_length; + from += msg_length; // NOLINT + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_read_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_empty_.notify_one(); + } + } + } + void wait_to_write(std::size_t length, std::atomic_bool& closed) { boost::interprocess::scoped_lock lock(m_mutex_); wait_for_write_ = true; std::atomic_thread_fence(std::memory_order_acq_rel); - c_full_.wait(lock, [this, length](){ return room() >= length; }); + c_full_.wait(lock, [this, length, &closed](){ return room() >= length || closed.load(); }); wait_for_write_ = false; } void write_in_buffer(char *base, char* top, const char* from, std::size_t length) noexcept { @@ -372,35 +371,73 @@ inline static std::int64_t n_cap(std::int64_t timeout) { // for request class unidirectional_message_wire : public simple_wire { + constexpr static std::size_t watch_interval = 2; public: unidirectional_message_wire(boost::interprocess::managed_shared_memory* managed_shm_ptr, std::size_t capacity) : simple_wire(managed_shm_ptr, capacity) {} /** - * @brief peep the current header. + * @brief wait a request message arives and peep the current header. + * @return the essage_header if request message has been received, for normal reception of request message. + * otherwise, dummy request message whose length is 0 and index is message_header::termination_request for termination request + * @throws std::runtime_error when timeout occures. */ - message_header peep(const char* base, bool wait_flag = false) { + message_header peep(const char* base) { while (true) { - if(stored() >= message_header::size || termination_requested_.load()) { - break; + if(stored() >= message_header::size) { + copy_header(base); + return header_received_; } - if (wait_flag) { - boost::interprocess::scoped_lock lock(m_mutex_); - wait_for_read_ = true; - std::atomic_thread_fence(std::memory_order_acq_rel); - c_empty_.wait(lock, [this](){ return (stored() >= message_header::size) || termination_requested_.load(); }); + if (termination_requested_.load()) { + termination_requested_.store(false); + return {message_header::terminate_request, 0}; + } + if (onetime_notification_.load()) { + throw std::runtime_error("received shutdown request from outside the communication partner"); + } + boost::interprocess::scoped_lock lock(m_mutex_); + wait_for_read_ = true; + std::atomic_thread_fence(std::memory_order_acq_rel); + if (!c_empty_.timed_wait(lock, + boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(watch_interval * 1000 * 1000))), + [this](){ return (stored() >= message_header::size) || termination_requested_.load() || onetime_notification_.load(); })) { wait_for_read_ = false; - } else { - if (stored() < message_header::size) { return {}; } + throw std::runtime_error("request has not been received within the specified time"); } + wait_for_read_ = false; } - if (!termination_requested_.load()) { - copy_header(base); - } else { - header_received_ = message_header(message_header::termination_request, 0); + } + /** + * @brief wake up the worker immediately. + */ + void notify() { + onetime_notification_.store(true); + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_read_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_empty_.notify_one(); + } + } + /** + * @brief close the request wire, used by the server. + */ + void close() { + closed_.store(true); + std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_write_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_full_.notify_one(); } - return header_received_; } + /** + * @brief write request message + * @param base the base address of the request wire + * @param from the request message to be written in the request wire + * @param header the header of the request message + */ + void write(char* base, const char* from, message_header header) { + simple_wire::write(base, from, header, closed_); + } /** * @brief wake up the worker thread waiting for request arrival, supposed to be used in server termination. */ @@ -412,23 +449,19 @@ class unidirectional_message_wire : public simple_wire { c_empty_.notify_one(); } } - /** - * @brief check if an termination request has been made - * @retrun true if terminate request has been made - */ - [[nodiscard]] bool terminate_requested() { - return termination_requested_.load(); - } private: std::atomic_bool termination_requested_{}; + std::atomic_bool onetime_notification_{}; + std::atomic_bool closed_{}; }; // for response class unidirectional_response_wire : public simple_wire { - constexpr static std::size_t watch_interval = 5; public: + constexpr static std::size_t watch_interval = 5; + unidirectional_response_wire(boost::interprocess::managed_shared_memory* managed_shm_ptr, std::size_t capacity) : simple_wire(managed_shm_ptr, capacity) {} /** @@ -440,19 +473,19 @@ class unidirectional_response_wire : public simple_wire { } while (true) { - if (closed_.load()) { - header_received_ = response_header(0, 0, 0); - return header_received_; - } if(stored() >= response_header::size) { break; } + if (closed_.load() || shutdown_.load()) { + header_received_ = response_header(0, 0, 0); + return header_received_; + } { boost::interprocess::scoped_lock lock(m_mutex_); wait_for_read_ = true; std::atomic_thread_fence(std::memory_order_acq_rel); - if (!c_empty_.timed_wait(lock, boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))), [this](){ return (stored() >= response_header::size) || closed_.load(); })) { + if (!c_empty_.timed_wait(lock, boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))), [this](){ return (stored() >= response_header::size) || closed_.load() || shutdown_.load(); })) { wait_for_read_ = false; throw std::runtime_error("response has not been received within the specified time"); } @@ -480,10 +513,43 @@ class unidirectional_response_wire : public simple_wire { [[nodiscard]] response_header::msg_type get_type() const { return header_received_.get_type(); } - + /** + * @brief close the response wire, used by the client. + */ void close() { closed_.store(true); std::atomic_thread_fence(std::memory_order_acq_rel); + if (wait_for_write_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_full_.notify_one(); + } + if (wait_for_read_) { + boost::interprocess::scoped_lock lock(m_mutex_); + c_empty_.notify_one(); + } + } + /** + * @brief check the session has been shut down + * @return true if the session has been shut down + */ + [[nodiscard]] bool check_shutdown() const noexcept { + return shutdown_.load(); + } + + /** + * @brief write response message + * @param base the base address of the response wire + * @param from the response message to be written in the response wire + * @param header the header of the response message + */ + void write(char* base, const char* from, response_header header) { + simple_wire::write(base, from, header, closed_); + } + /** + * @brief notify client of the client of the shutdown + */ + void notify_shutdown() { + shutdown_.store(true); if (wait_for_read_) { boost::interprocess::scoped_lock lock(m_mutex_); c_empty_.notify_one(); @@ -492,6 +558,7 @@ class unidirectional_response_wire : public simple_wire { private: std::atomic_bool closed_{}; + std::atomic_bool shutdown_{}; }; @@ -897,18 +964,22 @@ class status_provider { status_provider(boost::interprocess::managed_shared_memory* managed_shm_ptr, std::string_view file) : mutex_file_(file, managed_shm_ptr->get_segment_manager()) { } - [[nodiscard]] bool is_alive() { - int fd = open(mutex_file_.c_str(), O_WRONLY); // NOLINT + [[nodiscard]] std::string is_alive() { + int fd = open(mutex_file_.c_str(), O_RDONLY); // NOLINT if (fd < 0) { - return false; + std::stringstream ss{}; + ss << "cannot open the lock file (" << mutex_file_.c_str() << ")"; + return ss.str(); } if (flock(fd, LOCK_EX | LOCK_NB) == 0) { // NOLINT flock(fd, LOCK_UN); close(fd); - return false; + std::stringstream ss{}; + ss << "the lock file (" << mutex_file_.c_str() << ") is not locked, possibly due to server process lost"; + return ss.str(); } close(fd); - return true; + return {}; } private: @@ -923,6 +994,7 @@ class connection_queue constexpr static const char* name = "connection_queue"; class index_queue { + constexpr static std::size_t watch_interval = 5; using long_allocator = boost::interprocess::allocator; public: @@ -953,13 +1025,20 @@ class connection_queue } } } - void wait(std::atomic_bool& terminate) { + [[nodiscard]] bool wait(std::atomic_bool& terminate) { boost::interprocess::scoped_lock lock(mutex_); std::atomic_thread_fence(std::memory_order_acq_rel); - condition_.wait(lock, [this, &terminate](){ return (pushed_.load() > poped_.load()) || terminate.load(); }); + return condition_.timed_wait(lock, + boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(watch_interval * 1000 * 1000))), + [this, &terminate](){ return (pushed_.load() > poped_.load()) || terminate.load(); }); + } + // thread unsafe (assume single listener thread) + void pop() { + poped_.fetch_add(1); } - [[nodiscard]] std::size_t pop() { - return queue_.at(index(poped_.fetch_add(1))); + // thread unsafe (assume single listener thread) + [[nodiscard]] std::size_t front() { + return queue_.at(index(poped_.load())); } void notify() { condition_.notify_one(); @@ -996,11 +1075,11 @@ class connection_queue void accept(std::size_t session_id) { session_id_ = session_id; - std::atomic_thread_fence(std::memory_order_acq_rel); - { - boost::interprocess::scoped_lock lock(m_accepted_); - c_accepted_.notify_one(); - } + notify(); + } + void reject() { + session_id_ = session_id_indicating_error; + notify(); } [[nodiscard]] std::size_t wait(std::int64_t timeout = 0) { std::atomic_thread_fence(std::memory_order_acq_rel); @@ -1031,9 +1110,18 @@ class connection_queue boost::interprocess::interprocess_mutex m_accepted_{}; boost::interprocess::interprocess_condition c_accepted_{}; std::size_t session_id_{}; + + void notify() { + std::atomic_thread_fence(std::memory_order_acq_rel); + { + boost::interprocess::scoped_lock lock(m_accepted_); + c_accepted_.notify_one(); + } + } }; using element_allocator = boost::interprocess::allocator; + constexpr static std::size_t session_id_indicating_error = UINT64_MAX; /** * @brief Construct a new object. @@ -1058,23 +1146,37 @@ class connection_queue } std::size_t wait(std::size_t rid, std::int64_t timeout = 0) { auto& entry = v_requested_.at(rid); - auto rtnv = entry.wait(timeout); - entry.reuse(); - return rtnv; + try { + auto rtnv = entry.wait(timeout); + entry.reuse(); + return rtnv; + } catch (std::runtime_error &ex) { + entry.reuse(); + throw ex; + } } bool check(std::size_t rid) { return v_requested_.at(rid).check(); } - std::size_t listen() { - q_requested_.wait(terminate_); - return ++session_id_; + if (q_requested_.wait(terminate_)) { + return ++session_id_; + } + return 0; + } + std::size_t slot() { + return q_requested_.front(); + } + // either accept() or reject() must be called + void accept(std::size_t sid, std::size_t session_id) { + q_requested_.pop(); + v_requested_.at(sid).accept(session_id); } - std::size_t accept(std::size_t session_id) { - std::size_t sid = q_requested_.pop(); - auto& request = v_requested_.at(sid); - request.accept(session_id); - return sid; + // either accept() or reject() must be called + void reject(std::size_t sid) { + q_requested_.pop(); + v_requested_.at(sid).reject(); + q_free_.push(sid); } void disconnect(std::size_t rid) { q_free_.push(rid); diff --git a/test/ogawayama/stub/endpoint.h b/test/ogawayama/stub/endpoint.h index f1aea90..88c1b47 100644 --- a/test/ogawayama/stub/endpoint.h +++ b/test/ogawayama/stub/endpoint.h @@ -111,9 +111,9 @@ class endpoint { } void operator()() { while(true) { - auto h = wire_->get_request_wire()->peep(true); + auto h = wire_->get_request_wire()->peep(); auto index = h.get_idx(); - if (h.get_length() == 0 && index == tateyama::common::wire::message_header::termination_request) { break; } + if (h.get_length() == 0 && index == tateyama::common::wire::message_header::terminate_request) { break; } std::string message; message.resize(h.get_length()); wire_->get_request_wire()->read(message.data()); @@ -272,7 +272,8 @@ class endpoint { session_name += "-"; session_name += std::to_string(session_id); auto wire = std::make_unique(session_name); - std::size_t index = connection_queue.accept(session_id); + std::size_t index = connection_queue.slot(); + connection_queue.accept(index, session_id); try { std::unique_lock lk(mutex_); worker_ = std::make_unique(session_name, std::move(wire), [&connection_queue, index](){ connection_queue.disconnect(index); }); diff --git a/test/ogawayama/stub/server_wires_impl.h b/test/ogawayama/stub/server_wires_impl.h index 920880f..5d6a8cb 100644 --- a/test/ogawayama/stub/server_wires_impl.h +++ b/test/ogawayama/stub/server_wires_impl.h @@ -284,8 +284,8 @@ class server_wire_container wire_ = wire; bip_buffer_ = bip_buffer; } - message_header peep(bool wait = false) { - return wire_->peep(bip_buffer_, wait); + message_header peep() { + return wire_->peep(bip_buffer_); } std::string_view payload() { return wire_->payload(bip_buffer_);