Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream)!: remove limit on concurrent sessions and allow quitting apps with active sessions #3325

Merged
merged 3 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,32 +186,6 @@ editing the `conf` file in a text editor. Use the examples as reference.
</tr>
</table>

### channels

<table>
<tr>
<td>Description</td>
<td colspan="2">
Sunshine can support multiple clients streaming simultaneously,
at the cost of higher CPU and GPU usage.
@note{All connected clients share control of the same streaming session.}
@warning{Some hardware encoders may have limitations that reduce performance with multiple streams.}
</td>
</tr>
<tr>
<td>Default</td>
<td colspan="2">@code{}
1
@endcode</td>
</tr>
<tr>
<td>Example</td>
<td colspan="2">@code{}
channels = 1
@endcode</td>
</tr>
</table>

### global_prep_cmd

<table>
Expand Down
3 changes: 0 additions & 3 deletions src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ namespace config {
APPS_JSON_PATH,

20, // fecPercentage
1, // channels

ENCRYPTION_MODE_NEVER, // lan_encryption_mode
ENCRYPTION_MODE_OPPORTUNISTIC, // wan_encryption_mode
Expand Down Expand Up @@ -1046,8 +1045,6 @@ namespace config {
stream.ping_timeout = std::chrono::milliseconds(to);
}

int_between_f(vars, "channels", stream.channels, { 1, std::numeric_limits<int>::max() });

int_between_f(vars, "lan_encryption_mode", stream.lan_encryption_mode, { 0, 2 });
int_between_f(vars, "wan_encryption_mode", stream.wan_encryption_mode, { 0, 2 });

Expand Down
3 changes: 0 additions & 3 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ namespace config {

int fec_percentage;

// max unique instances of video and audio streams
int channels;

// Video encryption settings for LAN and WAN streams
int lan_encryption_mode;
int wan_encryption_mode;
Expand Down
5 changes: 3 additions & 2 deletions src/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@
}

host_t
host_create(af_e af, ENetAddress &addr, std::size_t peers, std::uint16_t port) {
host_create(af_e af, ENetAddress &addr, std::uint16_t port) {

Check warning on line 166 in src/network.cpp

View check run for this annotation

Codecov / codecov/patch

src/network.cpp#L166

Added line #L166 was not covered by tests
static std::once_flag enet_init_flag;
std::call_once(enet_init_flag, []() {
enet_initialize();
Expand All @@ -173,7 +173,8 @@
enet_address_set_host(&addr, any_addr.data());
enet_address_set_port(&addr, port);

auto host = host_t { enet_host_create(af == IPV4 ? AF_INET : AF_INET6, &addr, peers, 0, 0, 0) };
// Maximum of 128 clients, which should be enough for anyone
auto host = host_t { enet_host_create(af == IPV4 ? AF_INET : AF_INET6, &addr, 128, 0, 0, 0) };

// Enable opportunistic QoS tagging (automatically disables if the network appears to drop tagged packets)
enet_socket_set_option(host->socket, ENET_SOCKOPT_QOS, 1);
Expand Down
2 changes: 1 addition & 1 deletion src/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace net {
from_address(const std::string_view &view);

host_t
host_create(af_e af, ENetAddress &addr, std::size_t peers, std::uint16_t port);
host_create(af_e af, ENetAddress &addr, std::uint16_t port);

/**
* @brief Get the address family enum value from a string.
Expand Down
30 changes: 2 additions & 28 deletions src/nvhttp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,14 +820,6 @@ namespace nvhttp {
response->close_connection_after_response = true;
});

if (rtsp_stream::session_count() == config::stream.channels) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.<xmlattr>.status_message", "The host's concurrent stream limit has been reached. Stop an existing stream or increase the 'Channels' value in the Sunshine Web UI.");

return;
}

auto args = request->parse_query_string();
if (
args.find("rikey"s) == std::end(args) ||
Expand Down Expand Up @@ -913,16 +905,6 @@ namespace nvhttp {
response->close_connection_after_response = true;
});

// It is possible that due a race condition that this if-statement gives a false negative,
// that is automatically resolved in rtsp_server_t
if (rtsp_stream::session_count() == config::stream.channels) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.<xmlattr>.status_message", "The host's concurrent stream limit has been reached. Stop an existing stream or increase the 'Channels' value in the Sunshine Web UI.");

return;
}

auto current_appid = proc::proc.running();
if (current_appid == 0) {
tree.put("root.resume", 0);
Expand Down Expand Up @@ -999,19 +981,11 @@ namespace nvhttp {
response->close_connection_after_response = true;
});

// It is possible that due a race condition that this if-statement gives a false positive,
// the client should try again
if (rtsp_stream::session_count() != 0) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.<xmlattr>.status_message", "All sessions must be disconnected before quitting");

return;
}

tree.put("root.cancel", 1);
tree.put("root.<xmlattr>.status_code", 200);

rtsp_stream::terminate_sessions();

if (proc::proc.running() > 0) {
proc::proc.terminate();
}
Expand Down
84 changes: 39 additions & 45 deletions src/rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "sync.h"
#include "video.h"

#include <set>
#include <unordered_map>

namespace asio = boost::asio;
Expand Down Expand Up @@ -417,13 +418,6 @@

int
bind(net::af_e af, std::uint16_t port, boost::system::error_code &ec) {
{
auto lg = _session_slots.lock();

_session_slots->resize(config::stream.channels);
_slot_count = config::stream.channels;
}

acceptor.open(af == net::IPV4 ? tcp::v4() : tcp::v6(), ec);
if (ec) {
return -1;
Expand Down Expand Up @@ -529,7 +523,6 @@
}
raised_timeout = now + config::stream.ping_timeout;

--_slot_count;
launch_event.raise(std::move(launch_session));
}

Expand All @@ -552,9 +545,14 @@
}
}

/**
* @brief Get the number of active sessions.
* @return Count of active sessions.
*/
int
session_count() const {
return config::stream.channels - _slot_count;
session_count() {
auto lg = _session_slots.lock();
return _session_slots->size();

Check warning on line 555 in src/rtsp.cpp

View check run for this annotation

Codecov / codecov/patch

src/rtsp.cpp#L553-L555

Added lines #L553 - L555 were not covered by tests
}

safe::event_t<std::shared_ptr<launch_session_t>> launch_event;
Expand All @@ -573,20 +571,21 @@
auto discarded = launch_event.pop(0s);
if (discarded) {
BOOST_LOG(debug) << "Event timeout: "sv << discarded->unique_id;
++_slot_count;
}
}

auto lg = _session_slots.lock();

for (auto &slot : *_session_slots) {
if (slot && (all || stream::session::state(*slot) == stream::session::state_e::STOPPING)) {
stream::session::stop(*slot);
stream::session::join(*slot);

slot.reset();
for (auto i = _session_slots->begin(); i != _session_slots->end();) {
auto &slot = *(*i);
if (all || stream::session::state(slot) == stream::session::state_e::STOPPING) {
stream::session::stop(slot);
stream::session::join(slot);

++_slot_count;
i = _session_slots->erase(i);
}
else {
i++;
}
}

Expand All @@ -595,36 +594,33 @@
}
}

/**
* @brief Removes the provided session from the set of sessions.
* @param session The session to remove.
*/
void
clear(std::shared_ptr<stream::session_t> *session_p) {
remove(const std::shared_ptr<stream::session_t> &session) {

Check warning on line 602 in src/rtsp.cpp

View check run for this annotation

Codecov / codecov/patch

src/rtsp.cpp#L602

Added line #L602 was not covered by tests
auto lg = _session_slots.lock();

session_p->reset();

++_slot_count;
_session_slots->erase(session);
}

std::shared_ptr<stream::session_t> *
accept(std::shared_ptr<stream::session_t> &session) {
/**
* @brief Inserts the provided session into the set of sessions.
* @param session The session to insert.
*/
void
insert(const std::shared_ptr<stream::session_t> &session) {

Check warning on line 612 in src/rtsp.cpp

View check run for this annotation

Codecov / codecov/patch

src/rtsp.cpp#L612

Added line #L612 was not covered by tests
auto lg = _session_slots.lock();

for (auto &slot : *_session_slots) {
if (!slot) {
slot = session;
return &slot;
}
}

return nullptr;
_session_slots->emplace(session);
BOOST_LOG(info) << "New streaming session started [active sessions: "sv << _session_slots->size() << ']';
}

private:
std::unordered_map<std::string_view, cmd_func_t> _map_cmd_cb;

sync_util::sync_t<std::vector<std::shared_ptr<stream::session_t>>> _session_slots;
sync_util::sync_t<std::set<std::shared_ptr<stream::session_t>>> _session_slots;

std::chrono::steady_clock::time_point raised_timeout;
int _slot_count;

boost::asio::io_service ios;
tcp::acceptor acceptor { ios };
Expand Down Expand Up @@ -652,6 +648,11 @@
return server.session_count();
}

void
terminate_sessions() {
server.clear(true);

Check warning on line 653 in src/rtsp.cpp

View check run for this annotation

Codecov / codecov/patch

src/rtsp.cpp#L652-L653

Added lines #L652 - L653 were not covered by tests
}

int
send(tcp::socket &sock, const std::string_view &sv) {
std::size_t bytes_send = 0;
Expand Down Expand Up @@ -1110,19 +1111,12 @@
}

auto stream_session = stream::session::alloc(config, session);

auto slot = server->accept(stream_session);
if (!slot) {
BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']';

respond(sock, session, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
server->insert(stream_session);

if (stream::session::start(*stream_session, sock.remote_endpoint().address().to_string())) {
BOOST_LOG(error) << "Failed to start a streaming session"sv;

server->clear(slot);
server->remove(stream_session);
respond(sock, session, &option, 500, "Internal Server Error", req->sequenceNumber, {});
return;
}
Expand Down
10 changes: 10 additions & 0 deletions src/rtsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@ namespace rtsp_stream {
void
launch_session_clear(uint32_t launch_session_id);

/**
* @brief Get the number of active sessions.
* @return Count of active sessions.
*/
int
session_count();

/**
* @brief Terminates all running streaming sessions.
*/
void
terminate_sessions();

void
rtpThread();

Expand Down
2 changes: 1 addition & 1 deletion src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ namespace stream {
public:
int
bind(net::af_e address_family, std::uint16_t port) {
_host = net::host_create(address_family, _addr, config::stream.channels, port);
_host = net::host_create(address_family, _addr, port);

return !(bool) _host;
}
Expand Down
1 change: 0 additions & 1 deletion src_assets/common/assets/web/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ <h1 class="my-4">{{ $t('config.configuration') }}</h1>
id: "advanced",
name: "Advanced",
options: {
"channels": 1,
"fec_percentage": 20,
"qp": 28,
"min_threads": 2,
Expand Down
10 changes: 0 additions & 10 deletions src_assets/common/assets/web/configs/tabs/General.vue
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,6 @@ function removeCmd(index) {
<div class="form-text">{{ $t('config.log_level_desc') }}</div>
</div>

<!-- Maximum Connected Clients -->
<div class="mb-3">
<label for="channels" class="form-label">{{ $t('config.channels') }}</label>
<input type="text" class="form-control" id="channels" placeholder="1" v-model="config.channels" />
<div class="form-text">
{{ $t('config.channels_desc_1') }}<br>
{{ $t('_common.note') }} {{ $t('config.channels_desc_2') }}
</div>
</div>

<!-- Global Prep Commands -->
<div id="global_prep_cmd" class="mb-3 d-flex flex-column">
<label class="form-label">{{ $t('config.global_prep_cmd') }}</label>
Expand Down
Loading