Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Discord heartbeat interval in audio. #2765

Merged
merged 2 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Discord.Net.Core/DiscordConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,15 @@ public class DiscordConfig
/// </summary>
public const int MaxApplicationTagCount = 5;

/// <summary>
/// Returns the factor to reduce the heartbeat interval.
/// </summary>
/// <remarks>
/// If a heartbeat takes longer than the interval estimated by Discord, the connection will be closed.
/// This factor is used to reduce the interval and ensure that Discord will get the heartbeat within the estimated interval.
/// </remarks>
internal const double HeartbeatIntervalFactor = 0.9;

/// <summary>
/// Returns the maximum length of a voice channel status.
/// </summary>
Expand Down
86 changes: 49 additions & 37 deletions src/Discord.Net.WebSocket/Audio/AudioClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ namespace Discord.Audio
//TODO: Add audio reconnecting
internal partial class AudioClient : IAudioClient
{
internal struct StreamPair
private static readonly int ConnectionTimeoutMs = 30000; // 30 seconds
private static readonly int KeepAliveIntervalMs = 5000; // 5 seconds

private struct StreamPair
{
public AudioInStream Reader;
public AudioOutStream Writer;
Expand All @@ -40,6 +43,7 @@ public StreamPair(AudioInStream reader, AudioOutStream writer)
private readonly ConcurrentDictionary<ulong, StreamPair> _streams;

private Task _heartbeatTask, _keepaliveTask;
private int _heartbeatInterval;
private long _lastMessageTime;
private string _url, _sessionId, _token;
private ulong _userId;
Expand Down Expand Up @@ -71,7 +75,7 @@ internal AudioClient(SocketGuild guild, int clientId, ulong channelId)
ApiClient.ReceivedPacket += ProcessPacketAsync;

_stateLock = new SemaphoreSlim(1, 1);
_connection = new ConnectionManager(_stateLock, _audioLogger, 30000,
_connection = new ConnectionManager(_stateLock, _audioLogger, ConnectionTimeoutMs,
OnConnectingAsync, OnDisconnectingAsync, x => ApiClient.Disconnected += x);
_connection.Connected += () => _connectedEvent.InvokeAsync();
_connection.Disconnected += (ex, recon) => _disconnectedEvent.InvokeAsync(ex);
Expand Down Expand Up @@ -113,8 +117,8 @@ public async Task StopAsync()
private async Task OnConnectingAsync()
{
await _audioLogger.DebugAsync("Connecting ApiClient").ConfigureAwait(false);
await ApiClient.ConnectAsync("wss://" + _url + "?v=" + DiscordConfig.VoiceAPIVersion).ConfigureAwait(false);
await _audioLogger.DebugAsync("Listening on port " + ApiClient.UdpPort).ConfigureAwait(false);
await ApiClient.ConnectAsync($"wss://{_url}?v={DiscordConfig.VoiceAPIVersion}").ConfigureAwait(false);
await _audioLogger.DebugAsync($"Listening on port {ApiClient.UdpPort}").ConfigureAwait(false);
await _audioLogger.DebugAsync("Sending Identity").ConfigureAwait(false);
await ApiClient.SendIdentityAsync(_userId, _sessionId, _token).ConfigureAwait(false);

Expand All @@ -128,13 +132,13 @@ private async Task OnDisconnectingAsync(Exception ex)

//Wait for tasks to complete
await _audioLogger.DebugAsync("Waiting for heartbeater").ConfigureAwait(false);
var heartbeatTask = _heartbeatTask;
if (heartbeatTask != null)
await heartbeatTask.ConfigureAwait(false);

if (_heartbeatTask != null)
await _heartbeatTask.ConfigureAwait(false);
_heartbeatTask = null;
var keepaliveTask = _keepaliveTask;
if (keepaliveTask != null)
await keepaliveTask.ConfigureAwait(false);

if (_keepaliveTask != null)
await _keepaliveTask.ConfigureAwait(false);
_keepaliveTask = null;

while (_heartbeatTimes.TryDequeue(out _))
Expand Down Expand Up @@ -194,11 +198,12 @@ internal AudioInStream GetInputStream(ulong id)
{
if (_streams.TryGetValue(id, out StreamPair streamPair))
return streamPair.Reader;

return null;
}
internal async Task RemoveInputStreamAsync(ulong userId)
{
if (_streams.TryRemove(userId, out var pair))
if (_streams.TryRemove(userId, out StreamPair pair))
{
await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false);
pair.Reader.Dispose();
Expand Down Expand Up @@ -236,8 +241,7 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
ApiClient.SetUdpEndpoint(data.Ip, data.Port);
await ApiClient.SendDiscoveryAsync(_ssrc).ConfigureAwait(false);


_heartbeatTask = RunHeartbeatAsync(41250, _connection.CancelToken);
_heartbeatTask = RunHeartbeatAsync(_heartbeatInterval, _connection.CancelToken);
}
break;
case VoiceOpCode.SessionDescription:
Expand All @@ -250,10 +254,10 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)

SecretKey = data.SecretKey;
_isSpeaking = false;
await ApiClient.SendSetSpeaking(false).ConfigureAwait(false);
_keepaliveTask = RunKeepaliveAsync(5000, _connection.CancelToken);
await ApiClient.SendSetSpeaking(_isSpeaking).ConfigureAwait(false);
_keepaliveTask = RunKeepaliveAsync(_connection.CancelToken);

var _ = _connection.CompleteAsync();
_ = _connection.CompleteAsync();
}
break;
case VoiceOpCode.HeartbeatAck:
Expand All @@ -270,6 +274,14 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
}
}
break;
case VoiceOpCode.Hello:
{
await _audioLogger.DebugAsync("Received Hello").ConfigureAwait(false);
var data = (payload as JToken).ToObject<HelloEvent>(_serializer);

_heartbeatInterval = data.HeartbeatInterval;
}
break;
case VoiceOpCode.Speaking:
{
await _audioLogger.DebugAsync("Received Speaking").ConfigureAwait(false);
Expand All @@ -291,13 +303,12 @@ private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
break;
default:
await _audioLogger.WarningAsync($"Unknown OpCode ({opCode})").ConfigureAwait(false);
return;
break;
}
}
catch (Exception ex)
{
await _audioLogger.ErrorAsync($"Error handling {opCode}", ex).ConfigureAwait(false);
return;
}
}
private async Task ProcessPacketAsync(byte[] packet)
Expand Down Expand Up @@ -358,29 +369,28 @@ private async Task ProcessPacketAsync(byte[] packet)
}
else
{
if (!RTPReadStream.TryReadSsrc(packet, 0, out var ssrc))
if (!RTPReadStream.TryReadSsrc(packet, 0, out uint ssrc))
{
await _audioLogger.DebugAsync("Malformed Frame").ConfigureAwait(false);
return;
}
if (!_ssrcMap.TryGetValue(ssrc, out var userId))
else if (!_ssrcMap.TryGetValue(ssrc, out ulong userId))
{
await _audioLogger.DebugAsync($"Unknown SSRC {ssrc}").ConfigureAwait(false);
return;
}
if (!_streams.TryGetValue(userId, out var pair))
else if (!_streams.TryGetValue(userId, out StreamPair pair))
{
await _audioLogger.DebugAsync($"Unknown User {userId}").ConfigureAwait(false);
return;
}
try
else
{
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false);
return;
try
{
await pair.Writer.WriteAsync(packet, 0, packet.Length).ConfigureAwait(false);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync("Malformed Frame", ex).ConfigureAwait(false);
}
}
//await _audioLogger.DebugAsync($"Received {packet.Length} bytes from user {userId}").ConfigureAwait(false);
}
Expand All @@ -389,19 +399,20 @@ private async Task ProcessPacketAsync(byte[] packet)
catch (Exception ex)
{
await _audioLogger.WarningAsync("Failed to process UDP packet", ex).ConfigureAwait(false);
return;
}
}

private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
{
int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor);

// TODO: Clean this up when Discord's session patch is live
try
{
await _audioLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
while (!cancelToken.IsCancellationRequested)
{
var now = Environment.TickCount;
int now = Environment.TickCount;

//Did server respond to our last heartbeat?
if (_heartbeatTimes.Count != 0 && (now - _lastMessageTime) > intervalMillis &&
Expand All @@ -421,7 +432,8 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance
await _audioLogger.WarningAsync("Failed to send heartbeat", ex).ConfigureAwait(false);
}

await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
int delay = Math.Max(0, delayInterval - Latency);
await Task.Delay(delay, cancelToken).ConfigureAwait(false);
}
await _audioLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
}
Expand All @@ -434,14 +446,14 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance
await _audioLogger.ErrorAsync("Heartbeat Errored", ex).ConfigureAwait(false);
}
}
private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cancelToken)
private async Task RunKeepaliveAsync(CancellationToken cancelToken)
{
try
{
await _audioLogger.DebugAsync("Keepalive Started").ConfigureAwait(false);
while (!cancelToken.IsCancellationRequested)
{
var now = Environment.TickCount;
int now = Environment.TickCount;

try
{
Expand All @@ -454,7 +466,7 @@ private async Task RunKeepaliveAsync(int intervalMillis, CancellationToken cance
await _audioLogger.WarningAsync("Failed to send keepalive", ex).ConfigureAwait(false);
}

await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
await Task.Delay(KeepAliveIntervalMs, cancelToken).ConfigureAwait(false);
}
await _audioLogger.DebugAsync("Keepalive Stopped").ConfigureAwait(false);
}
Expand Down
5 changes: 4 additions & 1 deletion src/Discord.Net.WebSocket/DiscordSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3200,6 +3200,8 @@ private async Task ProcessMessageAsync(GatewayOpCode opCode, int? seq, string ty

private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cancelToken)
{
int delayInterval = (int)(intervalMillis * DiscordConfig.HeartbeatIntervalFactor);

try
{
await _gatewayLogger.DebugAsync("Heartbeat Started").ConfigureAwait(false);
Expand Down Expand Up @@ -3227,7 +3229,8 @@ private async Task RunHeartbeatAsync(int intervalMillis, CancellationToken cance
await _gatewayLogger.WarningAsync("Heartbeat Errored", ex).ConfigureAwait(false);
}

await Task.Delay(intervalMillis, cancelToken).ConfigureAwait(false);
int delay = Math.Max(0, delayInterval - Latency);
await Task.Delay(delay, cancelToken).ConfigureAwait(false);
}
await _gatewayLogger.DebugAsync("Heartbeat Stopped").ConfigureAwait(false);
}
Expand Down