Skip to content

Commit

Permalink
feat(stream)!: remove limit on concurrent sessions and allow quitting…
Browse files Browse the repository at this point in the history
… apps with active sessions (LizardByte#3325)
  • Loading branch information
cgutman authored and qiin2333 committed Oct 25, 2024
1 parent ca1efe6 commit ea1be40
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 124 deletions.
28 changes: 1 addition & 27 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,7 @@ editing the `conf` file in a text editor. Use the examples as reference.
</tr>
</table>

### [channels](https://localhost:47990/config/#channels)

<table>
<tr>
<td>Description</td>
<td colspan="2">
Sunshine can support multiple clients streaming simultaneously,
at the cost of higher CPU and GPU usage.
@note{All connected clients share control of the same streaming session.}
@warning{Some hardware encoders may have limitations that reduce performance with multiple streams.}
</td>
</tr>
<tr>
<td>Default</td>
<td colspan="2">@code{}
1
@endcode</td>
</tr>
<tr>
<td>Example</td>
<td colspan="2">@code{}
channels = 1
@endcode</td>
</tr>
</table>

### [global_prep_cmd](https://localhost:47990/config/#global_prep_cmd)
### global_prep_cmd

<table>
<tr>
Expand Down
3 changes: 0 additions & 3 deletions src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ namespace config {
APPS_JSON_PATH,

20, // fecPercentage
1, // channels

ENCRYPTION_MODE_NEVER, // lan_encryption_mode
ENCRYPTION_MODE_OPPORTUNISTIC, // wan_encryption_mode
Expand Down Expand Up @@ -1130,8 +1129,6 @@ namespace config {
stream.ping_timeout = std::chrono::milliseconds(to);
}

int_between_f(vars, "channels", stream.channels, { 1, std::numeric_limits<int>::max() });

int_between_f(vars, "lan_encryption_mode", stream.lan_encryption_mode, { 0, 2 });
int_between_f(vars, "wan_encryption_mode", stream.wan_encryption_mode, { 0, 2 });

Expand Down
3 changes: 0 additions & 3 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ namespace config {

int fec_percentage;

// max unique instances of video and audio streams
int channels;

// Video encryption settings for LAN and WAN streams
int lan_encryption_mode;
int wan_encryption_mode;
Expand Down
5 changes: 3 additions & 2 deletions src/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ namespace net {
}

host_t
host_create(af_e af, ENetAddress &addr, std::size_t peers, std::uint16_t port) {
host_create(af_e af, ENetAddress &addr, std::uint16_t port) {
static std::once_flag enet_init_flag;
std::call_once(enet_init_flag, []() {
enet_initialize();
Expand All @@ -173,7 +173,8 @@ namespace net {
enet_address_set_host(&addr, any_addr.data());
enet_address_set_port(&addr, port);

auto host = host_t { enet_host_create(af == IPV4 ? AF_INET : AF_INET6, &addr, peers, 0, 0, 0) };
// Maximum of 128 clients, which should be enough for anyone
auto host = host_t { enet_host_create(af == IPV4 ? AF_INET : AF_INET6, &addr, 128, 0, 0, 0) };

// Enable opportunistic QoS tagging (automatically disables if the network appears to drop tagged packets)
enet_socket_set_option(host->socket, ENET_SOCKOPT_QOS, 1);
Expand Down
2 changes: 1 addition & 1 deletion src/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace net {
from_address(const std::string_view &view);

host_t
host_create(af_e af, ENetAddress &addr, std::size_t peers, std::uint16_t port);
host_create(af_e af, ENetAddress &addr, std::uint16_t port);

/**
* @brief Get the address family enum value from a string.
Expand Down
30 changes: 2 additions & 28 deletions src/nvhttp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -910,14 +910,6 @@ namespace nvhttp {
}
});

if (rtsp_stream::session_count() == config::stream.channels) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.<xmlattr>.status_message", "The host's concurrent stream limit has been reached. Stop an existing stream or increase the 'Channels' value in the Sunshine Web UI.");

return;
}

auto args = request->parse_query_string();
if (
args.find("rikey"s) == std::end(args) ||
Expand Down Expand Up @@ -1014,16 +1006,6 @@ namespace nvhttp {
response->close_connection_after_response = true;
});

// It is possible that due a race condition that this if-statement gives a false negative,
// that is automatically resolved in rtsp_server_t
if (rtsp_stream::session_count() == config::stream.channels) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.<xmlattr>.status_message", "The host's concurrent stream limit has been reached. Stop an existing stream or increase the 'Channels' value in the Sunshine Web UI.");

return;
}

auto current_appid = proc::proc.running();
if (current_appid == 0) {
tree.put("root.resume", 0);
Expand Down Expand Up @@ -1103,19 +1085,11 @@ namespace nvhttp {
response->close_connection_after_response = true;
});

// It is possible that due a race condition that this if-statement gives a false positive,
// the client should try again
if (rtsp_stream::session_count() != 0) {
tree.put("root.resume", 0);
tree.put("root.<xmlattr>.status_code", 503);
tree.put("root.<xmlattr>.status_message", "All sessions must be disconnected before quitting");

return;
}

tree.put("root.cancel", 1);
tree.put("root.<xmlattr>.status_code", 200);

rtsp_stream::terminate_sessions();

if (proc::proc.running() > 0) {
proc::proc.terminate();
}
Expand Down
87 changes: 39 additions & 48 deletions src/rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern "C" {
#include "sync.h"
#include "video.h"

#include <set>
#include <unordered_map>

namespace asio = boost::asio;
Expand Down Expand Up @@ -417,13 +418,6 @@ namespace rtsp_stream {

int
bind(net::af_e af, std::uint16_t port, boost::system::error_code &ec) {
{
auto lg = _session_slots.lock();

_session_slots->resize(config::stream.channels);
_slot_count = config::stream.channels;
}

acceptor.open(af == net::IPV4 ? tcp::v4() : tcp::v6(), ec);
if (ec) {
return -1;
Expand Down Expand Up @@ -529,10 +523,6 @@ namespace rtsp_stream {
}
raised_timeout = now + config::stream.ping_timeout;

--_slot_count;

launch_session->env["SUNSHINE_CLIENT_SLOT"] = std::to_string(session_count());

launch_event.raise(std::move(launch_session));
}

Expand All @@ -555,9 +545,14 @@ namespace rtsp_stream {
}
}

/**
* @brief Get the number of active sessions.
* @return Count of active sessions.
*/
int
session_count() const {
return config::stream.channels - _slot_count;
session_count() {
auto lg = _session_slots.lock();
return _session_slots->size();
}

safe::event_t<std::shared_ptr<launch_session_t>> launch_event;
Expand All @@ -576,20 +571,21 @@ namespace rtsp_stream {
auto discarded = launch_event.pop(0s);
if (discarded) {
BOOST_LOG(debug) << "Event timeout: "sv << discarded->unique_id;
++_slot_count;
}
}

auto lg = _session_slots.lock();

for (auto &slot : *_session_slots) {
if (slot && (all || stream::session::state(*slot) == stream::session::state_e::STOPPING)) {
stream::session::stop(*slot);
stream::session::join(*slot);
for (auto i = _session_slots->begin(); i != _session_slots->end();) {
auto &slot = *(*i);
if (all || stream::session::state(slot) == stream::session::state_e::STOPPING) {
stream::session::stop(slot);
stream::session::join(slot);

slot.reset();

++_slot_count;
i = _session_slots->erase(i);
}
else {
i++;
}
}

Expand All @@ -598,36 +594,33 @@ namespace rtsp_stream {
}
}

/**
* @brief Removes the provided session from the set of sessions.
* @param session The session to remove.
*/
void
clear(std::shared_ptr<stream::session_t> *session_p) {
remove(const std::shared_ptr<stream::session_t> &session) {
auto lg = _session_slots.lock();

session_p->reset();

++_slot_count;
_session_slots->erase(session);
}

std::shared_ptr<stream::session_t> *
accept(std::shared_ptr<stream::session_t> &session) {
/**
* @brief Inserts the provided session into the set of sessions.
* @param session The session to insert.
*/
void
insert(const std::shared_ptr<stream::session_t> &session) {
auto lg = _session_slots.lock();

for (auto &slot : *_session_slots) {
if (!slot) {
slot = session;
return &slot;
}
}

return nullptr;
_session_slots->emplace(session);
BOOST_LOG(info) << "New streaming session started [active sessions: "sv << _session_slots->size() << ']';
}

private:
std::unordered_map<std::string_view, cmd_func_t> _map_cmd_cb;

sync_util::sync_t<std::vector<std::shared_ptr<stream::session_t>>> _session_slots;
sync_util::sync_t<std::set<std::shared_ptr<stream::session_t>>> _session_slots;

std::chrono::steady_clock::time_point raised_timeout;
int _slot_count;

boost::asio::io_service ios;
tcp::acceptor acceptor { ios };
Expand Down Expand Up @@ -655,6 +648,11 @@ namespace rtsp_stream {
return server.session_count();
}

void
terminate_sessions() {
server.clear(true);
}

int
send(tcp::socket &sock, const std::string_view &sv) {
std::size_t bytes_send = 0;
Expand Down Expand Up @@ -1113,19 +1111,12 @@ namespace rtsp_stream {
}

auto stream_session = stream::session::alloc(config, session);

auto slot = server->accept(stream_session);
if (!slot) {
BOOST_LOG(info) << "Ran out of slots for client from ["sv << ']';

respond(sock, session, &option, 503, "Service Unavailable", req->sequenceNumber, {});
return;
}
server->insert(stream_session);

if (stream::session::start(*stream_session, sock.remote_endpoint().address().to_string())) {
BOOST_LOG(error) << "Failed to start a streaming session"sv;

server->clear(slot);
server->remove(stream_session);
respond(sock, session, &option, 500, "Internal Server Error", req->sequenceNumber, {});
return;
}
Expand Down
10 changes: 10 additions & 0 deletions src/rtsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,19 @@ namespace rtsp_stream {
void
launch_session_clear(uint32_t launch_session_id);

/**
* @brief Get the number of active sessions.
* @return Count of active sessions.
*/
int
session_count();

/**
* @brief Terminates all running streaming sessions.
*/
void
terminate_sessions();

void
rtpThread();

Expand Down
2 changes: 1 addition & 1 deletion src/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ namespace stream {
public:
int
bind(net::af_e address_family, std::uint16_t port) {
_host = net::host_create(address_family, _addr, config::stream.channels, port);
_host = net::host_create(address_family, _addr, port);

return !(bool) _host;
}
Expand Down
1 change: 0 additions & 1 deletion src_assets/common/assets/web/config.html
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ <h1 class="my-4">{{ $t('config.configuration') }}</h1>
id: "advanced",
name: "Advanced",
options: {
channels: 1,
fec_percentage: 20,
qp: 28,
min_threads: 2,
Expand Down
10 changes: 0 additions & 10 deletions src_assets/common/assets/web/configs/tabs/General.vue
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,6 @@ function removeCmd(index) {
<div class="form-text">{{ $t('config.log_level_desc') }}</div>
</div>

<!-- Maximum Connected Clients -->
<div class="mb-3">
<label for="channels" class="form-label">{{ $t('config.channels') }}</label>
<input type="text" class="form-control" id="channels" placeholder="1" v-model="config.channels" />
<div class="form-text">
{{ $t('config.channels_desc_1') }}<br>
{{ $t('_common.note') }} {{ $t('config.channels_desc_2') }}
</div>
</div>

<!-- Global Prep Commands -->
<div id="global_prep_cmd" class="mb-3 d-flex flex-column">
<label class="form-label">{{ $t('config.global_prep_cmd') }}</label>
Expand Down

0 comments on commit ea1be40

Please sign in to comment.