From 04dc814ab453fe36bae2de175d55f66fc4c4c2e4 Mon Sep 17 00:00:00 2001 From: ns6089 <61738816+ns6089@users.noreply.github.com> Date: Thu, 4 Jul 2024 15:49:14 +0300 Subject: [PATCH] An attempt at static flow control --- src/platform/common.h | 16 +++ src/platform/windows/display.h | 5 +- src/platform/windows/display_base.cpp | 35 +----- src/platform/windows/misc.cpp | 51 ++++++++ src/stream.cpp | 162 +++++++++++++++++++++----- src/utility.h | 3 +- 6 files changed, 210 insertions(+), 62 deletions(-) diff --git a/src/platform/common.h b/src/platform/common.h index 727586c6b01..1f67a99feda 100644 --- a/src/platform/common.h +++ b/src/platform/common.h @@ -10,6 +10,8 @@ #include #include +#include + #include "src/config.h" #include "src/logging.h" #include "src/stat_trackers.h" @@ -790,4 +792,18 @@ namespace platf { */ std::vector & supported_gamepads(input_t *input); + + struct high_precision_timer: private boost::noncopyable { + virtual ~high_precision_timer() = default; + + virtual void + sleep_for(const std::chrono::nanoseconds &duration) = 0; + + virtual + operator bool() = 0; + }; + + std::unique_ptr + create_high_precision_timer(); + } // namespace platf diff --git a/src/platform/windows/display.h b/src/platform/windows/display.h index b257abb8e85..3e035490394 100644 --- a/src/platform/windows/display.h +++ b/src/platform/windows/display.h @@ -161,9 +161,6 @@ namespace platf::dxgi { int init(const ::video::config_t &config, const std::string &display_name); - void - high_precision_sleep(std::chrono::nanoseconds duration); - capture_e capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override; @@ -184,7 +181,7 @@ namespace platf::dxgi { DXGI_FORMAT capture_format; D3D_FEATURE_LEVEL feature_level; - util::safe_ptr_v2, BOOL, CloseHandle> timer; + std::unique_ptr timer = create_high_precision_timer(); typedef enum _D3DKMT_SCHEDULINGPRIORITYCLASS { D3DKMT_SCHEDULINGPRIORITYCLASS_IDLE, ///< Idle priority class diff --git a/src/platform/windows/display_base.cpp b/src/platform/windows/display_base.cpp index 664fff8780f..6ab0f0c2c81 100644 --- a/src/platform/windows/display_base.cpp +++ b/src/platform/windows/display_base.cpp @@ -182,27 +182,6 @@ namespace platf::dxgi { release_frame(); } - void - display_base_t::high_precision_sleep(std::chrono::nanoseconds duration) { - if (!timer) { - BOOST_LOG(error) << "Attempting high_precision_sleep() with uninitialized timer"; - return; - } - if (duration < 0s) { - BOOST_LOG(error) << "Attempting high_precision_sleep() with negative duration"; - return; - } - if (duration > 5s) { - BOOST_LOG(error) << "Attempting high_precision_sleep() with unexpectedly large duration (>5s)"; - return; - } - - LARGE_INTEGER due_time; - due_time.QuadPart = duration.count() / -100; - SetWaitableTimer(timer.get(), &due_time, 0, nullptr, nullptr, false); - WaitForSingleObject(timer.get(), INFINITE); - } - capture_e display_base_t::capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) { auto adjust_client_frame_rate = [&]() -> DXGI_RATIONAL { @@ -268,7 +247,7 @@ namespace platf::dxgi { status = capture_e::timeout; } else { - high_precision_sleep(sleep_period); + timer->sleep_for(sleep_period); std::chrono::nanoseconds overshoot_ns = std::chrono::steady_clock::now() - sleep_target; log_sleep_overshoot(overshoot_ns); @@ -799,15 +778,9 @@ namespace platf::dxgi { << "Max Full Luminance : "sv << desc1.MaxFullFrameLuminance << " nits"sv; } - // Use CREATE_WAITABLE_TIMER_HIGH_RESOLUTION if supported (Windows 10 1809+) - timer.reset(CreateWaitableTimerEx(nullptr, nullptr, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS)); - if (!timer) { - timer.reset(CreateWaitableTimerEx(nullptr, nullptr, 0, TIMER_ALL_ACCESS)); - if (!timer) { - auto winerr = GetLastError(); - BOOST_LOG(error) << "Failed to create timer: "sv << winerr; - return -1; - } + if (!timer || !*timer) { + BOOST_LOG(error) << "Uninitialized high precision timer"; + return -1; } return 0; diff --git a/src/platform/windows/misc.cpp b/src/platform/windows/misc.cpp index 32c5da98abc..82632537ee2 100644 --- a/src/platform/windows/misc.cpp +++ b/src/platform/windows/misc.cpp @@ -1803,4 +1803,55 @@ namespace platf { return output; } + + class win32_high_precision_timer: public high_precision_timer { + public: + win32_high_precision_timer() { + // Use CREATE_WAITABLE_TIMER_HIGH_RESOLUTION if supported (Windows 10 1809+) + timer = CreateWaitableTimerEx(nullptr, nullptr, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS); + if (!timer) { + timer = CreateWaitableTimerEx(nullptr, nullptr, 0, TIMER_ALL_ACCESS); + if (!timer) { + BOOST_LOG(error) << "Unable to create high_precision_timer, CreateWaitableTimerEx() failed: " << GetLastError(); + } + } + } + + ~win32_high_precision_timer() { + if (timer) CloseHandle(timer); + } + + void + sleep_for(const std::chrono::nanoseconds &duration) override { + if (!timer) { + BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with uninitialized timer"; + return; + } + if (duration < 0s) { + BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with negative duration"; + return; + } + if (duration > 5s) { + BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with unexpectedly large duration (>5s)"; + return; + } + + LARGE_INTEGER due_time; + due_time.QuadPart = duration.count() / -100; + SetWaitableTimer(timer, &due_time, 0, nullptr, nullptr, false); + WaitForSingleObject(timer, INFINITE); + } + + operator bool() override { + return timer != NULL; + } + + private: + HANDLE timer = NULL; + }; + + std::unique_ptr + create_high_precision_timer() { + return std::make_unique(); + } } // namespace platf diff --git a/src/stream.cpp b/src/stream.cpp index 02f230b7b98..fea6c8ce4a5 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -29,6 +29,8 @@ extern "C" { #include "thread_safe.h" #include "utility.h" +#include "platform/common.h" + #define IDX_START_A 0 #define IDX_START_B 1 #define IDX_INVALIDATE_REF_FRAMES 2 @@ -1246,13 +1248,29 @@ namespace stream { platf::adjust_thread_priority(platf::thread_priority_e::high); stat_trackers::min_max_avg_tracker frame_processing_latency_tracker; + + stat_trackers::min_max_avg_tracker frame_send_batch_latency_tracker; + stat_trackers::min_max_avg_tracker frame_fec_latency_tracker; + stat_trackers::min_max_avg_tracker frame_network_latency_tracker; + crypto::aes_t iv(12); + auto timer = platf::create_high_precision_timer(); + if (!timer || !*timer) { + BOOST_LOG(error) << "Failed to create timer, aborting video broadcast thread"; + return; + } + + auto ratecontrol_next_frame_start = std::chrono::steady_clock::now(); + while (auto packet = packets->pop()) { if (shutdown_event->peek()) { break; } + auto frame_packet_start_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds fec_time = 0ns; + auto session = (session_t *) packet->channel_data; auto lowseq = session->video.lowseq; @@ -1381,6 +1399,21 @@ namespace stream { } try { + // Use around 80% of 1Gbps 1Gbps percent ms packet byte + size_t ratecontrol_packets_in_1ms = std::giga::num * 80 / 100 / 1000 / blocksize / 8; + + // Send less than 64K in a single batch. + // On Windows, batches above 64K seem to bypass SO_SNDBUF regardless of its size, + // appear in "Other I/O" and begin waiting on interrupts. + // This gives inconsistent performance so we'd rather avoid it. + size_t send_batch_size = 64 * 1024 / blocksize; + + // Don't ignore the last ratecontrol group of the previous frame + auto ratecontrol_frame_start = std::max(ratecontrol_next_frame_start, std::chrono::steady_clock::now()); + + size_t ratecontrol_frame_packets_sent = 0; + size_t ratecontrol_group_packets_sent = 0; + auto blockIndex = 0; std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view ¤t_payload) { auto packets = (current_payload.size() + (blocksize - 1)) / blocksize; @@ -1404,10 +1437,27 @@ namespace stream { } } + auto fec_start = std::chrono::steady_clock::now(); + // If video encryption is enabled, we allocate space for the encryption header before each shard auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets, session->video.cipher ? sizeof(video_packet_enc_prefix_t) : 0); + fec_time += std::chrono::steady_clock::now() - fec_start; + + auto peer_address = session->video.peer.address(); + auto batch_info = platf::batched_send_info_t { + nullptr, + shards.prefixsize + shards.blocksize, + 0, + (uintptr_t) sock.native_handle(), + peer_address, + session->video.peer.port(), + session->localAddress, + }; + + size_t next_shard_to_send = 0; + // set FEC info now that we know for sure what our percentage will be for this frame for (auto x = 0; x < shards.size(); ++x) { auto *inspect = (video_packet_raw_t *) shards.data(x); @@ -1448,37 +1498,89 @@ namespace stream { std::copy(std::begin(iv), std::end(iv), prefix->iv); session->video.cipher->encrypt(std::string_view { (char *) inspect, (size_t) blocksize }, prefix->tag, &iv); } - } - auto peer_address = session->video.peer.address(); - auto batch_info = platf::batched_send_info_t { - shards.shards.begin(), - shards.prefixsize + shards.blocksize, - shards.nr_shards, - (uintptr_t) sock.native_handle(), - peer_address, - session->video.peer.port(), - session->localAddress, - }; + if (x - next_shard_to_send + 1 >= send_batch_size || + x + 1 == shards.size()) { + // Do pacing within the frame. + // Also trigger pacing before the first send_batch() of the frame + // to account for the last send_batch() of the previous frame. + if (ratecontrol_group_packets_sent >= ratecontrol_packets_in_1ms || + ratecontrol_frame_packets_sent == 0) { + auto due = ratecontrol_frame_start + + std::chrono::duration_cast(1ms) * + ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms; + + auto now = std::chrono::steady_clock::now(); + if (now < due) { + timer->sleep_for(due - now); + } + + ratecontrol_group_packets_sent = 0; + } - // Use a batched send if it's supported on this platform - if (!platf::send_batch(batch_info)) { - // Batched send is not available, so send each packet individually - BOOST_LOG(verbose) << "Falling back to unbatched send"sv; - for (auto x = 0; x < shards.size(); ++x) { - auto send_info = platf::send_info_t { - shards.prefix(x), - shards.prefixsize + shards.blocksize, - (uintptr_t) sock.native_handle(), - peer_address, - session->video.peer.port(), - session->localAddress, - }; - - platf::send(send_info); + size_t current_batch_size = x - next_shard_to_send + 1; + batch_info.buffer = shards.data(next_shard_to_send); + batch_info.block_count = current_batch_size; + + auto batch_start_time = std::chrono::steady_clock::now(); + // Use a batched send if it's supported on this platform + if (!platf::send_batch(batch_info)) { + // Batched send is not available, so send each packet individually + BOOST_LOG(verbose) << "Falling back to unbatched send"sv; + for (auto y = 0; y < current_batch_size; y++) { + auto send_info = platf::send_info_t { + shards.prefix(next_shard_to_send + y), + shards.prefixsize + shards.blocksize, + (uintptr_t) sock.native_handle(), + peer_address, + session->video.peer.port(), + session->localAddress, + }; + + platf::send(send_info); + } + } + if (config::sunshine.min_log_level <= 1) { + // Print send_batch() latency stats to debug log every 20 seconds + auto print_info = [&](double min_latency, double max_latency, double avg_latency) { + auto f = stat_trackers::one_digit_after_decimal(); + BOOST_LOG(debug) << "Network: individial send_batch() latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms"; + }; + double send_batch_latency = (std::chrono::steady_clock::now() - batch_start_time).count() / 1000000.; + frame_send_batch_latency_tracker.collect_and_callback_on_interval(send_batch_latency, print_info, 20s); + } + + ratecontrol_group_packets_sent += current_batch_size; + ratecontrol_frame_packets_sent += current_batch_size; + next_shard_to_send = x + 1; } } + // remember this in case the next frame comes immediately + ratecontrol_next_frame_start = ratecontrol_frame_start + + std::chrono::duration_cast(1ms) * + ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms; + + if (config::sunshine.min_log_level <= 1) { + // Print frame FEC latency stats to debug log every 20 seconds + auto print_info = [&](double min_latency, double max_latency, double avg_latency) { + auto f = stat_trackers::one_digit_after_decimal(); + BOOST_LOG(debug) << "Network: frame FEC latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms"; + }; + double fec_latency = fec_time.count() / 1000000.; + frame_fec_latency_tracker.collect_and_callback_on_interval(fec_latency, print_info, 20s); + } + + if (config::sunshine.min_log_level <= 1) { + // Print frame network latency stats to debug log every 20 seconds + auto print_info = [&](double min_latency, double max_latency, double avg_latency) { + auto f = stat_trackers::one_digit_after_decimal(); + BOOST_LOG(debug) << "Network: frame complete network latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms"; + }; + double network_latency = (std::chrono::steady_clock::now() - frame_packet_start_time).count() / 1000000.; + frame_network_latency_tracker.collect_and_callback_on_interval(network_latency, print_info, 20s); + } + if (packet->is_idr()) { BOOST_LOG(verbose) << "Key Frame ["sv << packet->frame_index() << "] :: send ["sv << shards.size() << "] shards..."sv; } @@ -1628,6 +1730,14 @@ namespace stream { return -1; } + // Set video socket send buffer size (SO_SENDBUF) to 1MB + try { + ctx.video_sock.set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024)); + } + catch (...) { + BOOST_LOG(error) << "Failed to set video socket send buffer size (SO_SENDBUF)"; + } + ctx.video_sock.bind(udp::endpoint(protocol, video_port), ec); if (ec) { BOOST_LOG(fatal) << "Couldn't bind Video server to port ["sv << video_port << "]: "sv << ec.message(); diff --git a/src/utility.h b/src/utility.h index 3da006f8d47..c3b4f3d813b 100644 --- a/src/utility.h +++ b/src/utility.h @@ -606,7 +606,8 @@ namespace util { return _deleter; } - explicit operator bool() const { + explicit + operator bool() const { return _p != nullptr; }