Skip to content

Commit

Permalink
use dev_idle_work_interval parameter, specifying idle work interval f…
Browse files Browse the repository at this point in the history
…or stream listener
  • Loading branch information
t-horikawa committed Feb 27, 2024
1 parent 11c9db2 commit bdead83
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 7 deletions.
14 changes: 13 additions & 1 deletion src/tateyama/endpoint/stream/bootstrap/stream_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,15 @@ class stream_listener {
}
auto threads = threads_opt.value();

auto timeout_opt = endpoint_config->get<std::size_t>("dev_idle_work_interval");
if (!timeout_opt) {
LOG_LP(ERROR) << "cannot find dev_idle_work_interval at the section in the configuration";
exit(1);
}
auto timeout = timeout_opt.value();

// connection stream
connection_socket_ = std::make_unique<connection_socket>(port);
connection_socket_ = std::make_unique<connection_socket>(port, timeout);

// worker objects
workers_.resize(threads);
Expand All @@ -90,6 +97,11 @@ class stream_listener {
LOG(INFO) << tateyama::endpoint::common::stream_endpoint_config_prefix
<< "threads: " << threads_opt.value() << ", "
<< "the number of maximum sessions.";
if (timeout != 1000) {
LOG(INFO) << tateyama::endpoint::common::stream_endpoint_config_prefix
<< "dev_idle_work_interval has changed to: " << timeout_opt.value() << ", "
<< "the idle work interval for stream listener in millisecond.";
}
}
~stream_listener() {
connection_socket_->close();
Expand Down
14 changes: 8 additions & 6 deletions src/tateyama/endpoint/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ class connection_socket
* @brief Construct a new object.
*/
connection_socket() = delete;
connection_socket(std::uint32_t port, std::size_t socket_limit) : socket_limit_(socket_limit) {
connection_socket(std::uint32_t port, std::size_t timeout, std::size_t socket_limit) : socket_limit_(socket_limit), timeout_(timeout) {
// create a pipe
if (pipe(&pair_[0]) != 0) {
throw std::runtime_error("cannot create a pipe");
Expand All @@ -383,7 +383,8 @@ class connection_socket
// listen the port
listen(socket_, SOMAXCONN);
}
explicit connection_socket(std::uint32_t port) : connection_socket(port, default_socket_limit) {}
connection_socket(std::uint32_t port, std::size_t timeout) : connection_socket(port, timeout, default_socket_limit) {}
explicit connection_socket(std::uint32_t port) : connection_socket(port, 1000, default_socket_limit) {} // for tests
~connection_socket() = default;

/**
Expand All @@ -397,10 +398,10 @@ class connection_socket
std::shared_ptr<stream_socket> accept(const std::function<void(void)>& cleanup = [](){} ) {
cleanup();

struct timeval tv{};
tv.tv_sec = 1; // 1(S)
tv.tv_usec = 0;
while (true) {
struct timeval tv{};
tv.tv_sec = static_cast<std::int64_t>(timeout_ / 1000); // sec
tv.tv_usec = static_cast<std::int64_t>((timeout_ % 1000) * 1000); // usec
FD_ZERO(&fds_); // NOLINT
if (is_socket_available()) {
FD_SET(socket_, &fds_); // NOLINT
Expand Down Expand Up @@ -452,7 +453,8 @@ class connection_socket
std::atomic_uint64_t num_open_{0};
std::mutex num_mutex_{};
std::condition_variable num_condition_{};

std::size_t timeout_;

friend class stream_socket;

[[nodiscard]] bool is_socket_available() {
Expand Down
1 change: 1 addition & 0 deletions test/tateyama/utils/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static constexpr std::string_view default_configuration_for_tests { // NOLINT
"[stream_endpoint]\n"
"port=12345\n"
"threads=104\n"
"dev_idle_work_interval=1000\n"

"[fdw]\n"
"name=tsurugi\n"
Expand Down

0 comments on commit bdead83

Please sign in to comment.