Skip to content

Commit

Permalink
Merge pull request #171 from project-tsurugi/wip/i_592
Browse files Browse the repository at this point in the history
change the method of storing worker threads that were denied TCP/IP c…
  • Loading branch information
t-horikawa authored Jan 19, 2024
2 parents ff45739 + 991a83e commit ff3cde9
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 14 deletions.
15 changes: 4 additions & 11 deletions src/tateyama/endpoint/stream/bootstrap/stream_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class stream_listener {

arrive_and_wait();
while(true) {
undertakers_.erase(std::remove_if(std::begin(undertakers_), std::end(undertakers_), [](std::unique_ptr<stream_worker>& worker){ return worker->wait_for() == std::future_status::ready; }), std::cend(undertakers_));
std::shared_ptr<stream_socket> stream{};
try {
stream = connection_socket_->accept();
Expand All @@ -128,16 +129,8 @@ class stream_listener {
if (!found) {
auto worker_decline = std::make_unique<stream_worker>(*router_, session_id, std::move(stream), status_->database_info(), true);
auto *worker_decline_ptr = worker_decline.get();
worker_decline->invoke([&]{worker_decline->run([&](){
std::unique_lock lock{mutex_};
if (auto itr = undertakers_.find(worker_decline_ptr); itr != undertakers_.end()) {
undertakers_.erase(itr);
}
});});
{
std::unique_lock lock{mutex_};
undertakers_.emplace(std::move(worker_decline));
}
undertakers_.emplace_back(std::move(worker_decline));
worker_decline_ptr->invoke([&]{worker_decline_ptr->run();});
LOG_LP(ERROR) << "the number of sessions exceeded the limit (" << workers_.size() << ")";
continue;
}
Expand Down Expand Up @@ -170,7 +163,7 @@ class stream_listener {
const std::shared_ptr<status_info::resource::bridge> status_{};
std::unique_ptr<connection_socket> connection_socket_{};
std::vector<std::unique_ptr<stream_worker>> workers_{};
std::set<std::unique_ptr<stream_worker>, tateyama::endpoint::common::pointer_comp<stream_worker>> undertakers_{};
std::vector<std::unique_ptr<stream_worker>> undertakers_{};
std::mutex mutex_{};

boost::barrier sync{2};
Expand Down
3 changes: 1 addition & 2 deletions src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

namespace tateyama::endpoint::stream::bootstrap {

void stream_worker::run(const std::function<void(void)>& clean_up)
void stream_worker::run()
{
{
std::uint16_t slot{};
Expand All @@ -43,7 +43,6 @@ void stream_worker::run(const std::function<void(void)>& clean_up)
} else {
LOG_LP(INFO) << "illegal procedure (receive a request in spite of a decline case)"; // should not reach here
}
clean_up();
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/tateyama/endpoint/stream/bootstrap/stream_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class stream_worker : public tateyama::endpoint::common::worker_common {
stream_worker& operator = (stream_worker const&) = delete;
stream_worker& operator = (stream_worker&&) = delete;

void run(const std::function<void(void)>& clean_up = [](){});
void run();
friend class stream_provider;

private:
Expand Down

0 comments on commit ff3cde9

Please sign in to comment.