Skip to content

Commit

Permalink
fix: stats
Browse files Browse the repository at this point in the history
  • Loading branch information
SenexCrenshaw committed Feb 10, 2024
1 parent 23f9e15 commit 623fab1
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 37 deletions.
4 changes: 2 additions & 2 deletions StreamMaster.Streams/Factories/StreamHandlerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using StreamMaster.Streams.Streams;
namespace StreamMaster.Streams.Factories;

public sealed class StreamHandlerFactory(IClientStreamerManager clientStreamerManager, IMemoryCache memoryCache, ILoggerFactory loggerFactory, IProxyFactory proxyFactory) : IStreamHandlerFactory
public sealed class StreamHandlerFactory(IInputStatisticsManager inputStatisticsManager, IMemoryCache memoryCache, ILoggerFactory loggerFactory, IProxyFactory proxyFactory) : IStreamHandlerFactory
{
public async Task<IStreamHandler?> CreateStreamHandlerAsync(VideoStreamDto videoStreamDto, string ChannelId, string ChannelName, int rank, CancellationToken cancellationToken)
{
Expand All @@ -15,7 +15,7 @@ public sealed class StreamHandlerFactory(IClientStreamerManager clientStreamerMa
return null;
}

StreamHandler streamHandler = new(videoStreamDto, processId, memoryCache, loggerFactory);
StreamHandler streamHandler = new(videoStreamDto, processId, ChannelId, ChannelName, rank, memoryCache, loggerFactory, inputStatisticsManager);

_ = Task.Run(() => streamHandler.StartVideoStreamingAsync(stream), cancellationToken);

Expand Down
44 changes: 34 additions & 10 deletions StreamMaster.Streams/Streams/StreamHandler.Main.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Channels;

namespace StreamMaster.Streams.Streams;

Expand All @@ -23,6 +22,9 @@ public sealed partial class StreamHandler : IStreamHandler
private readonly IMemoryCache memoryCache;
private readonly ILogger<IStreamHandler> logger;

private readonly IInputStatisticsManager inputStatisticsManager;
private readonly IInputStreamingStatistics inputStreamStatistics;

public VideoStreamDto VideoStreamDto { get; }
public int M3UFileId { get; }
public int ProcessId { get; set; }
Expand All @@ -37,8 +39,21 @@ public sealed partial class StreamHandler : IStreamHandler

public bool IsFailed { get; private set; }
public int RestartCount { get; set; }

public StreamHandler(VideoStreamDto videoStreamDto, int processId, IMemoryCache memoryCache, ILoggerFactory loggerFactory)
public readonly StreamInfo StreamInfo;

/// <summary>
/// Initializes a new instance of the StreamHandler class, setting up the video stream handling,
/// logging, and statistics tracking based on the provided parameters.
/// </summary>
/// <param name="videoStreamDto">A DTO containing the video stream information, such as stream URL, name, and ID.</param>
/// <param name="processId">The process ID associated with this stream handler instance.</param>
/// <param name="channelId">The unique identifier for the channel associated with the video stream.</param>
/// <param name="channelName">The name of the channel associated with the video stream.</param>
/// <param name="rank">The rank or priority of the video stream, used for sorting or prioritization.</param>
/// <param name="memoryCache">An IMemoryCache instance for caching purposes within the stream handler.</param>
/// <param name="loggerFactory">An ILoggerFactory instance used to create loggers for logging information and events.</param>
/// <param name="inputStatisticsManager">An IInputStatisticsManager instance for managing and tracking input statistics.</param>
public StreamHandler(VideoStreamDto videoStreamDto, int processId, string channelId, string channelName, int rank, IMemoryCache memoryCache, ILoggerFactory loggerFactory, IInputStatisticsManager inputStatisticsManager)
{
this.memoryCache = memoryCache;
logger = loggerFactory.CreateLogger<StreamHandler>();
Expand All @@ -48,18 +63,25 @@ public StreamHandler(VideoStreamDto videoStreamDto, int processId, IMemoryCache
StreamUrl = videoStreamDto.User_Url;
VideoStreamId = videoStreamDto.Id;
VideoStreamName = videoStreamDto.User_Tvg_name;
this.inputStatisticsManager = inputStatisticsManager;

BoundedChannelOptions options = new(videoBufferSize)
_writeLogger = loggerFactory.CreateLogger<WriteLogger>();

StreamInfo = new StreamInfo
{
Capacity = videoBufferSize,
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = true
ChannelId = channelId,
ChannelName = channelName,
VideoStreamId = videoStreamDto.Id,
VideoStreamName = videoStreamDto.User_Tvg_name,
Logo = videoStreamDto.User_Tvg_logo,
StreamingProxyType = videoStreamDto.StreamingProxyType,
StreamUrl = videoStreamDto.User_Url,

Rank = rank
};

//videoChannel = Channel.CreateBounded<Memory<byte>>(options);
inputStreamStatistics = inputStatisticsManager.RegisterInputReader(StreamInfo);

_writeLogger = loggerFactory.CreateLogger<WriteLogger>();
}

private void OnStreamingStopped(bool InputStreamError)
Expand Down Expand Up @@ -115,6 +137,7 @@ public void RegisterClientStreamer(IClientStreamerConfiguration streamerConfigur
{

_ = clientStreamerConfigs.TryAdd(streamerConfiguration.ClientId, streamerConfiguration);
inputStreamStatistics.IncrementClient();
++ClientCount;

logger.LogInformation("RegisterClientStreamer for Client ID {ClientId} to Video Stream Id {videoStreamId} {name}", streamerConfiguration.ClientId, VideoStreamId, VideoStreamName);
Expand All @@ -132,6 +155,7 @@ public bool UnRegisterClientStreamer(Guid ClientId)
{
logger.LogInformation("UnRegisterClientStreamer ClientId: {ClientId} {name}", ClientId, VideoStreamName);
bool result = clientStreamerConfigs.TryRemove(ClientId, out _);
inputStreamStatistics.DecrementClient();
--ClientCount;

return result;
Expand Down
29 changes: 29 additions & 0 deletions StreamMaster.Streams/Streams/StreamHandler.Stats.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using StreamMaster.Domain.Cache;
using StreamMaster.Domain.Extensions;

namespace StreamMaster.Streams.Streams;


/// <summary>
/// Manages the streaming of a single video stream, including client registrations and circularRingbuffer handling.
/// </summary>
public sealed partial class StreamHandler
{
private DateTime _lastUpdateTime = SMDT.UtcNow;
private int acculmativeBytesWritten = 0;
private void SetMetrics(int bytesWritten)
{
DateTime currentTime = SMDT.UtcNow;

Setting setting = memoryCache.GetSetting();

if (setting.EnablePrometheus && (currentTime - _lastUpdateTime > TimeSpan.FromSeconds(5)))
{
inputStreamStatistics.AddBytesWritten(acculmativeBytesWritten);
_lastUpdateTime = currentTime;
acculmativeBytesWritten = 0;
}

acculmativeBytesWritten += bytesWritten;
}
}
8 changes: 4 additions & 4 deletions StreamMaster.Streams/Streams/StreamHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,10 @@ public async Task StartVideoStreamingAsync(Stream stream)
}
}

foreach (IClientStreamerConfiguration clientStreamerConfig in clientStreamerConfigs.Values)
{
clientStreamerConfig.ReadBuffer?.ReadChannel.Writer.Complete();
}
//foreach (IClientStreamerConfiguration clientStreamerConfig in clientStreamerConfigs.Values)
//{
// clientStreamerConfig.ReadBuffer?.ReadChannel.Writer.Complete();
//}

stream.Close();
stream.Dispose();
Expand Down
43 changes: 22 additions & 21 deletions StreamMaster.Streams/Streams/StreamManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,27 +108,28 @@ private async void StreamHandler_OnStreamingStoppedEvent(object? sender, StreamH
{
if (StoppedEvent.InputStreamError && streamHandler.ClientCount > 0)
{
if (await streamHandlerFactory.RestartStreamHandlerAsync(streamHandler).ConfigureAwait(false) == null)
{
OnStreamingStoppedEvent?.Invoke(sender, streamHandler);
}
else
{
//streamHandler.RestartCount = 0;
//foreach (Guid clientId in streamHandler.GetClientStreamerClientIds())
//{
// IClientStreamerConfiguration? clientStreamerConfiguration = await clientStreamerManager.GetClientStreamerConfiguration(clientId);
// if (clientStreamerConfiguration != null && clientStreamerConfiguration.ReadBuffer != null)
// {
// long _lastReadIndex = streamHandler.CircularRingBuffer.GetNextReadIndex();
// //if (_lastReadIndex > StreamHandler.ChunkSize)
// //{
// // _lastReadIndex -= StreamHandler.ChunkSize;
// //}
// clientStreamerConfiguration.ReadBuffer.SetLastIndex(_lastReadIndex);
// }
//}
}
OnStreamingStoppedEvent?.Invoke(sender, streamHandler);
//if (await streamHandlerFactory.RestartStreamHandlerAsync(streamHandler).ConfigureAwait(false) == null)
//{
// OnStreamingStoppedEvent?.Invoke(sender, streamHandler);
//}
//else
//{
// //streamHandler.RestartCount = 0;
// //foreach (Guid clientId in streamHandler.GetClientStreamerClientIds())
// //{
// // IClientStreamerConfiguration? clientStreamerConfiguration = await clientStreamerManager.GetClientStreamerConfiguration(clientId);
// // if (clientStreamerConfiguration != null && clientStreamerConfiguration.ReadBuffer != null)
// // {
// // long _lastReadIndex = streamHandler.CircularRingBuffer.GetNextReadIndex();
// // //if (_lastReadIndex > StreamHandler.ChunkSize)
// // //{
// // // _lastReadIndex -= StreamHandler.ChunkSize;
// // //}
// // clientStreamerConfiguration.ReadBuffer.SetLastIndex(_lastReadIndex);
// // }
// //}
//}
}
else
{
Expand Down

0 comments on commit 623fab1

Please sign in to comment.