Skip to content

Commit

Permalink
An attempt at static flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
ns6089 committed Jul 7, 2024
1 parent 7ca0546 commit 04dc814
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 62 deletions.
16 changes: 16 additions & 0 deletions src/platform/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <mutex>
#include <string>

#include <boost/core/noncopyable.hpp>

#include "src/config.h"
#include "src/logging.h"
#include "src/stat_trackers.h"
Expand Down Expand Up @@ -790,4 +792,18 @@ namespace platf {
*/
std::vector<supported_gamepad_t> &
supported_gamepads(input_t *input);

struct high_precision_timer: private boost::noncopyable {
virtual ~high_precision_timer() = default;

virtual void
sleep_for(const std::chrono::nanoseconds &duration) = 0;

virtual
operator bool() = 0;
};

std::unique_ptr<high_precision_timer>
create_high_precision_timer();

} // namespace platf
5 changes: 1 addition & 4 deletions src/platform/windows/display.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,6 @@ namespace platf::dxgi {
int
init(const ::video::config_t &config, const std::string &display_name);

void
high_precision_sleep(std::chrono::nanoseconds duration);

capture_e
capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) override;

Expand All @@ -184,7 +181,7 @@ namespace platf::dxgi {
DXGI_FORMAT capture_format;
D3D_FEATURE_LEVEL feature_level;

util::safe_ptr_v2<std::remove_pointer_t<HANDLE>, BOOL, CloseHandle> timer;
std::unique_ptr<high_precision_timer> timer = create_high_precision_timer();

typedef enum _D3DKMT_SCHEDULINGPRIORITYCLASS {
D3DKMT_SCHEDULINGPRIORITYCLASS_IDLE, ///< Idle priority class
Expand Down
35 changes: 4 additions & 31 deletions src/platform/windows/display_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,27 +182,6 @@ namespace platf::dxgi {
release_frame();
}

void
display_base_t::high_precision_sleep(std::chrono::nanoseconds duration) {
if (!timer) {
BOOST_LOG(error) << "Attempting high_precision_sleep() with uninitialized timer";
return;
}
if (duration < 0s) {
BOOST_LOG(error) << "Attempting high_precision_sleep() with negative duration";
return;
}
if (duration > 5s) {
BOOST_LOG(error) << "Attempting high_precision_sleep() with unexpectedly large duration (>5s)";
return;
}

LARGE_INTEGER due_time;
due_time.QuadPart = duration.count() / -100;
SetWaitableTimer(timer.get(), &due_time, 0, nullptr, nullptr, false);
WaitForSingleObject(timer.get(), INFINITE);
}

capture_e
display_base_t::capture(const push_captured_image_cb_t &push_captured_image_cb, const pull_free_image_cb_t &pull_free_image_cb, bool *cursor) {
auto adjust_client_frame_rate = [&]() -> DXGI_RATIONAL {
Expand Down Expand Up @@ -268,7 +247,7 @@ namespace platf::dxgi {
status = capture_e::timeout;
}
else {
high_precision_sleep(sleep_period);
timer->sleep_for(sleep_period);
std::chrono::nanoseconds overshoot_ns = std::chrono::steady_clock::now() - sleep_target;
log_sleep_overshoot(overshoot_ns);

Expand Down Expand Up @@ -799,15 +778,9 @@ namespace platf::dxgi {
<< "Max Full Luminance : "sv << desc1.MaxFullFrameLuminance << " nits"sv;
}

// Use CREATE_WAITABLE_TIMER_HIGH_RESOLUTION if supported (Windows 10 1809+)
timer.reset(CreateWaitableTimerEx(nullptr, nullptr, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS));
if (!timer) {
timer.reset(CreateWaitableTimerEx(nullptr, nullptr, 0, TIMER_ALL_ACCESS));
if (!timer) {
auto winerr = GetLastError();
BOOST_LOG(error) << "Failed to create timer: "sv << winerr;
return -1;
}
if (!timer || !*timer) {
BOOST_LOG(error) << "Uninitialized high precision timer";
return -1;
}

return 0;
Expand Down
51 changes: 51 additions & 0 deletions src/platform/windows/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1803,4 +1803,55 @@ namespace platf {

return output;
}

class win32_high_precision_timer: public high_precision_timer {
public:
win32_high_precision_timer() {
// Use CREATE_WAITABLE_TIMER_HIGH_RESOLUTION if supported (Windows 10 1809+)
timer = CreateWaitableTimerEx(nullptr, nullptr, CREATE_WAITABLE_TIMER_HIGH_RESOLUTION, TIMER_ALL_ACCESS);
if (!timer) {
timer = CreateWaitableTimerEx(nullptr, nullptr, 0, TIMER_ALL_ACCESS);
if (!timer) {
BOOST_LOG(error) << "Unable to create high_precision_timer, CreateWaitableTimerEx() failed: " << GetLastError();
}
}
}

~win32_high_precision_timer() {
if (timer) CloseHandle(timer);
}

void
sleep_for(const std::chrono::nanoseconds &duration) override {
if (!timer) {
BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with uninitialized timer";
return;
}
if (duration < 0s) {
BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with negative duration";
return;
}
if (duration > 5s) {
BOOST_LOG(error) << "Attempting high_precision_timer::sleep_for() with unexpectedly large duration (>5s)";
return;
}

LARGE_INTEGER due_time;
due_time.QuadPart = duration.count() / -100;
SetWaitableTimer(timer, &due_time, 0, nullptr, nullptr, false);
WaitForSingleObject(timer, INFINITE);
}

operator bool() override {
return timer != NULL;
}

private:
HANDLE timer = NULL;
};

std::unique_ptr<high_precision_timer>
create_high_precision_timer() {
return std::make_unique<win32_high_precision_timer>();
}
} // namespace platf
162 changes: 136 additions & 26 deletions src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ extern "C" {
#include "thread_safe.h"
#include "utility.h"

#include "platform/common.h"

#define IDX_START_A 0
#define IDX_START_B 1
#define IDX_INVALIDATE_REF_FRAMES 2
Expand Down Expand Up @@ -1246,13 +1248,29 @@ namespace stream {
platf::adjust_thread_priority(platf::thread_priority_e::high);

stat_trackers::min_max_avg_tracker<uint16_t> frame_processing_latency_tracker;

stat_trackers::min_max_avg_tracker<double> frame_send_batch_latency_tracker;
stat_trackers::min_max_avg_tracker<double> frame_fec_latency_tracker;
stat_trackers::min_max_avg_tracker<double> frame_network_latency_tracker;

crypto::aes_t iv(12);

auto timer = platf::create_high_precision_timer();
if (!timer || !*timer) {
BOOST_LOG(error) << "Failed to create timer, aborting video broadcast thread";
return;
}

auto ratecontrol_next_frame_start = std::chrono::steady_clock::now();

while (auto packet = packets->pop()) {
if (shutdown_event->peek()) {
break;
}

auto frame_packet_start_time = std::chrono::steady_clock::now();
std::chrono::nanoseconds fec_time = 0ns;

auto session = (session_t *) packet->channel_data;
auto lowseq = session->video.lowseq;

Expand Down Expand Up @@ -1381,6 +1399,21 @@ namespace stream {
}

try {
// Use around 80% of 1Gbps 1Gbps percent ms packet byte
size_t ratecontrol_packets_in_1ms = std::giga::num * 80 / 100 / 1000 / blocksize / 8;

// Send less than 64K in a single batch.
// On Windows, batches above 64K seem to bypass SO_SNDBUF regardless of its size,
// appear in "Other I/O" and begin waiting on interrupts.
// This gives inconsistent performance so we'd rather avoid it.
size_t send_batch_size = 64 * 1024 / blocksize;

// Don't ignore the last ratecontrol group of the previous frame
auto ratecontrol_frame_start = std::max(ratecontrol_next_frame_start, std::chrono::steady_clock::now());

size_t ratecontrol_frame_packets_sent = 0;
size_t ratecontrol_group_packets_sent = 0;

auto blockIndex = 0;
std::for_each(fec_blocks_begin, fec_blocks_end, [&](std::string_view &current_payload) {
auto packets = (current_payload.size() + (blocksize - 1)) / blocksize;
Expand All @@ -1404,10 +1437,27 @@ namespace stream {
}
}

auto fec_start = std::chrono::steady_clock::now();

// If video encryption is enabled, we allocate space for the encryption header before each shard
auto shards = fec::encode(current_payload, blocksize, fecPercentage, session->config.minRequiredFecPackets,
session->video.cipher ? sizeof(video_packet_enc_prefix_t) : 0);

fec_time += std::chrono::steady_clock::now() - fec_start;

auto peer_address = session->video.peer.address();
auto batch_info = platf::batched_send_info_t {
nullptr,
shards.prefixsize + shards.blocksize,
0,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};

size_t next_shard_to_send = 0;

// set FEC info now that we know for sure what our percentage will be for this frame
for (auto x = 0; x < shards.size(); ++x) {
auto *inspect = (video_packet_raw_t *) shards.data(x);
Expand Down Expand Up @@ -1448,37 +1498,89 @@ namespace stream {
std::copy(std::begin(iv), std::end(iv), prefix->iv);
session->video.cipher->encrypt(std::string_view { (char *) inspect, (size_t) blocksize }, prefix->tag, &iv);
}
}

auto peer_address = session->video.peer.address();
auto batch_info = platf::batched_send_info_t {
shards.shards.begin(),
shards.prefixsize + shards.blocksize,
shards.nr_shards,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};
if (x - next_shard_to_send + 1 >= send_batch_size ||
x + 1 == shards.size()) {
// Do pacing within the frame.
// Also trigger pacing before the first send_batch() of the frame
// to account for the last send_batch() of the previous frame.
if (ratecontrol_group_packets_sent >= ratecontrol_packets_in_1ms ||
ratecontrol_frame_packets_sent == 0) {
auto due = ratecontrol_frame_start +
std::chrono::duration_cast<std::chrono::nanoseconds>(1ms) *
ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms;

auto now = std::chrono::steady_clock::now();
if (now < due) {
timer->sleep_for(due - now);
}

ratecontrol_group_packets_sent = 0;
}

// Use a batched send if it's supported on this platform
if (!platf::send_batch(batch_info)) {
// Batched send is not available, so send each packet individually
BOOST_LOG(verbose) << "Falling back to unbatched send"sv;
for (auto x = 0; x < shards.size(); ++x) {
auto send_info = platf::send_info_t {
shards.prefix(x),
shards.prefixsize + shards.blocksize,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};

platf::send(send_info);
size_t current_batch_size = x - next_shard_to_send + 1;
batch_info.buffer = shards.data(next_shard_to_send);
batch_info.block_count = current_batch_size;

auto batch_start_time = std::chrono::steady_clock::now();
// Use a batched send if it's supported on this platform
if (!platf::send_batch(batch_info)) {
// Batched send is not available, so send each packet individually
BOOST_LOG(verbose) << "Falling back to unbatched send"sv;
for (auto y = 0; y < current_batch_size; y++) {
auto send_info = platf::send_info_t {
shards.prefix(next_shard_to_send + y),
shards.prefixsize + shards.blocksize,
(uintptr_t) sock.native_handle(),
peer_address,
session->video.peer.port(),
session->localAddress,
};

platf::send(send_info);
}
}
if (config::sunshine.min_log_level <= 1) {
// Print send_batch() latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: individial send_batch() latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double send_batch_latency = (std::chrono::steady_clock::now() - batch_start_time).count() / 1000000.;
frame_send_batch_latency_tracker.collect_and_callback_on_interval(send_batch_latency, print_info, 20s);
}

ratecontrol_group_packets_sent += current_batch_size;
ratecontrol_frame_packets_sent += current_batch_size;
next_shard_to_send = x + 1;
}
}

// remember this in case the next frame comes immediately
ratecontrol_next_frame_start = ratecontrol_frame_start +
std::chrono::duration_cast<std::chrono::nanoseconds>(1ms) *
ratecontrol_frame_packets_sent / ratecontrol_packets_in_1ms;

if (config::sunshine.min_log_level <= 1) {
// Print frame FEC latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: frame FEC latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double fec_latency = fec_time.count() / 1000000.;
frame_fec_latency_tracker.collect_and_callback_on_interval(fec_latency, print_info, 20s);
}

if (config::sunshine.min_log_level <= 1) {
// Print frame network latency stats to debug log every 20 seconds
auto print_info = [&](double min_latency, double max_latency, double avg_latency) {
auto f = stat_trackers::one_digit_after_decimal();
BOOST_LOG(debug) << "Network: frame complete network latency (min/max/avg): " << f % min_latency << "ms/" << f % max_latency << "ms/" << f % avg_latency << "ms";
};
double network_latency = (std::chrono::steady_clock::now() - frame_packet_start_time).count() / 1000000.;
frame_network_latency_tracker.collect_and_callback_on_interval(network_latency, print_info, 20s);
}

if (packet->is_idr()) {
BOOST_LOG(verbose) << "Key Frame ["sv << packet->frame_index() << "] :: send ["sv << shards.size() << "] shards..."sv;
}
Expand Down Expand Up @@ -1628,6 +1730,14 @@ namespace stream {
return -1;
}

// Set video socket send buffer size (SO_SENDBUF) to 1MB
try {
ctx.video_sock.set_option(boost::asio::socket_base::send_buffer_size(1024 * 1024));
}
catch (...) {
BOOST_LOG(error) << "Failed to set video socket send buffer size (SO_SENDBUF)";
}

ctx.video_sock.bind(udp::endpoint(protocol, video_port), ec);
if (ec) {
BOOST_LOG(fatal) << "Couldn't bind Video server to port ["sv << video_port << "]: "sv << ec.message();
Expand Down
3 changes: 2 additions & 1 deletion src/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ namespace util {
return _deleter;
}

explicit operator bool() const {
explicit
operator bool() const {
return _p != nullptr;
}

Expand Down

0 comments on commit 04dc814

Please sign in to comment.