Skip to content

Commit

Permalink
Optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
HakanL committed Dec 10, 2024
1 parent 1b8a3bd commit 21ed618
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 70 deletions.
4 changes: 2 additions & 2 deletions src/Haukcode.sACN/Haukcode.sACN.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Haukcode.Network" Version="1.0.10" />
<PackageReference Include="Haukcode.Network" Version="1.0.12" />
<PackageReference Include="HdrHistogram" Version="2.5.0" />
<PackageReference Include="System.Net.Http" Version="4.3.4" />
<PackageReference Include="System.Net.NetworkInformation" Version="4.3.0" />
Expand All @@ -52,7 +52,7 @@
</When>
<Otherwise>
<ItemGroup>
<PackageReference Include="Haukcode.HighPerfComm" Version="1.0.10" />
<PackageReference Include="Haukcode.HighPerfComm" Version="1.0.11" />
</ItemGroup>
</Otherwise>
</Choose>
Expand Down
13 changes: 10 additions & 3 deletions src/Haukcode.sACN/Model/DMPLayer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,18 @@ internal static DMPLayer Parse(BigEndianBinaryReader buffer)
short propertyValueCount = buffer.ReadInt16();

byte startCode = buffer.ReadByte();
byte[] properties = buffer.ReadBytes(propertyValueCount - 1);
if (propertyValueCount > 0)
{
var properties = buffer.ReadSlice(propertyValueCount - 1);

var dmpLayer = new DMPLayer(properties, startCode);
var dmpLayer = new DMPLayer(properties, startCode);

return dmpLayer;
return dmpLayer;
}
else
{
return new DMPLayer(Array.Empty<byte>());
}
}
}
}
121 changes: 56 additions & 65 deletions src/Haukcode.sACN/SACNClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -15,7 +16,7 @@

namespace Haukcode.sACN
{
public class SACNClient : Client<SACNClient.SendData, SocketReceiveMessageFromResult>
public class SACNClient : Client<SACNClient.SendData, ReceiveDataPacket>
{
public class SendData : HighPerfComm.SendData
{
Expand All @@ -24,12 +25,11 @@ public class SendData : HighPerfComm.SendData
public IPEndPoint? Destination { get; set; }
}

private const int ReceiveBufferSize = 680 * 40 * 400;
private const int SendBufferSize = 680 * 20 * 400;
private const int ReceiveBufferSize = 680 * 20 * 200;
private const int SendBufferSize = 680 * 20 * 200;
private static readonly IPEndPoint _blankEndpoint = new(IPAddress.Any, 0);

private readonly Socket listenMulticastSocket;
private readonly Socket listenUnicastSocket;
private Socket? listenSocket;
private readonly Socket sendSocket;
private readonly ISubject<ReceiveDataPacket> packetSubject;
private readonly Dictionary<ushort, byte> sequenceIds = [];
Expand All @@ -41,7 +41,7 @@ public class SendData : HighPerfComm.SendData
private readonly Dictionary<ushort, IPEndPoint> universeMulticastEndpoints = [];

public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int port = 5568)
: base(() => new SendData(), 1024)
: base(() => new SendData(), SACNPacket.MAX_PACKET_SIZE)
{
if (senderId == Guid.Empty)
throw new ArgumentException("Invalid sender Id", nameof(senderId));
Expand All @@ -55,21 +55,6 @@ public SACNClient(Guid senderId, string senderName, IPAddress localAddress, int

this.packetSubject = new Subject<ReceiveDataPacket>();

this.listenMulticastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.listenMulticastSocket.ReceiveBufferSize = ReceiveBufferSize;
SetSocketOptions(this.listenMulticastSocket);

this.listenUnicastSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.listenUnicastSocket.ReceiveBufferSize = ReceiveBufferSize;
SetSocketOptions(this.listenUnicastSocket);

// Linux wants IPAddress.Any to get multicast/broadcast packets
this.listenMulticastSocket.Bind(new IPEndPoint(IPAddress.Any, port));
this.listenUnicastSocket.Bind(this.localEndPoint);

// Only join local LAN group
this.listenMulticastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 1);

this.sendSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
ConfigureSendSocket(this.sendSocket);
}
Expand Down Expand Up @@ -109,8 +94,6 @@ private void ConfigureSendSocket(Socket socket)

SetSocketOptions(socket);

socket.Bind(this.localEndPoint);

// Multicast socket settings
socket.DontFragment = true;
socket.MulticastLoopback = false;
Expand All @@ -136,29 +119,33 @@ private void ConfigureSendSocket(Socket socket)
/// </summary>
public IReadOnlyCollection<ushort> DMXUniverses => this.dmxUniverses.ToList();

protected override bool SupportsTwoReceivers => true;

public void JoinDMXUniverse(ushort universeId)
{
if (this.listenSocket == null)
throw new ArgumentNullException();

if (this.dmxUniverses.Contains(universeId))
throw new InvalidOperationException($"You have already joined the DMX Universe {universeId}");

// Join group
var option = new MulticastOption(SACNCommon.GetMulticastAddress(universeId), this.localEndPoint.Address);
this.listenMulticastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, option);
this.listenSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.AddMembership, option);

// Add to the list of universes we have joined
this.dmxUniverses.Add(universeId);
}

public void DropDMXUniverse(ushort universeId)
{
if (this.listenSocket == null)
return;

if (!this.dmxUniverses.Contains(universeId))
throw new InvalidOperationException($"You are trying to drop the DMX Universe {universeId} but you are not a member");

// Drop group
var option = new MulticastOption(SACNCommon.GetMulticastAddress(universeId), this.localEndPoint.Address);
this.listenMulticastSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.DropMembership, option);
this.listenSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.DropMembership, option);

// Remove from the list of universes we have joined
this.dmxUniverses.Remove(universeId);
Expand Down Expand Up @@ -275,26 +262,6 @@ protected override void Dispose(bool disposing)

if (disposing)
{
try
{
this.listenMulticastSocket.Shutdown(SocketShutdown.Both);
}
catch
{
}
this.listenMulticastSocket.Close();
this.listenMulticastSocket.Dispose();

try
{
this.listenUnicastSocket.Shutdown(SocketShutdown.Both);
}
catch
{
}
this.listenUnicastSocket.Close();
this.listenUnicastSocket.Dispose();

try
{
this.sendSocket.Shutdown(SocketShutdown.Both);
Expand Down Expand Up @@ -323,43 +290,67 @@ protected override ValueTask<int> SendPacketAsync(SendData sendData, ReadOnlyMem
return this.sendSocket.SendToAsync(payload, SocketFlags.None, destination);
}

protected async override ValueTask<(int ReceivedBytes, SocketReceiveMessageFromResult Result)> ReceiveData1(Memory<byte> memory, CancellationToken cancelToken)
protected async override ValueTask<(int ReceivedBytes, SocketReceiveMessageFromResult Result)> ReceiveData(Memory<byte> memory, CancellationToken cancelToken)
{
var result = await this.listenMulticastSocket.ReceiveMessageFromAsync(memory, SocketFlags.None, _blankEndpoint, cancelToken);
var result = await this.listenSocket!.ReceiveMessageFromAsync(memory, SocketFlags.None, _blankEndpoint, cancelToken);

return (result.ReceivedBytes, result);
}

protected async override ValueTask<(int ReceivedBytes, SocketReceiveMessageFromResult Result)> ReceiveData2(Memory<byte> memory, CancellationToken cancelToken)
protected override ReceiveDataPacket? TryParseObject(ReadOnlyMemory<byte> buffer, double timestampMS, IPEndPoint sourceIP, IPAddress destinationIP)
{
var result = await this.listenUnicastSocket.ReceiveMessageFromAsync(memory, SocketFlags.None, _blankEndpoint, cancelToken);

return (result.ReceivedBytes, result);
}

protected override void ParseReceiveData(ReadOnlyMemory<byte> memory, SocketReceiveMessageFromResult result, double timestampMS)
{
var packet = SACNPacket.Parse(memory);
var packet = SACNPacket.Parse(buffer);

// Note that we're still using the memory from the pipeline here, the packet is not allocating its own DMX data byte array
if (packet != null)
{
var newPacket = new ReceiveDataPacket
var parsedObject = new ReceiveDataPacket
{
TimestampMS = timestampMS,
Source = (IPEndPoint)result.RemoteEndPoint,
Source = sourceIP,
Packet = packet
};

if (!this.endPointCache.TryGetValue(result.PacketInformation.Address, out var ipEndPoint))
if (!this.endPointCache.TryGetValue(destinationIP, out var ipEndPoint))
{
ipEndPoint = (new IPEndPoint(result.PacketInformation.Address, this.localEndPoint.Port), result.PacketInformation.Address.GetAddressBytes()[0] == 239);
this.endPointCache.Add(result.PacketInformation.Address, ipEndPoint);
ipEndPoint = (new IPEndPoint(destinationIP, this.localEndPoint.Port), destinationIP.GetAddressBytes()[0] == 239);
this.endPointCache.Add(destinationIP, ipEndPoint);
}

newPacket.Destination = ipEndPoint.Multicast ? null : ipEndPoint.EndPoint;
parsedObject.Destination = ipEndPoint.Multicast ? null : ipEndPoint.EndPoint;

return parsedObject;
}

return null;
}

protected override void InitializeReceiveSocket()
{
this.listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
this.listenSocket.ReceiveBufferSize = ReceiveBufferSize;
SetSocketOptions(this.listenSocket);

// Linux wants IPAddress.Any to get all types of packets (unicast/multicast/broadcast)
this.listenSocket.Bind(new IPEndPoint(IPAddress.Any, this.localEndPoint.Port));

// Only join local LAN group
this.listenSocket.SetSocketOption(SocketOptionLevel.IP, SocketOptionName.MulticastTimeToLive, 1);
}

this.packetSubject.OnNext(newPacket);
protected override void DisposeReceiveSocket()
{
try
{
this.listenSocket?.Shutdown(SocketShutdown.Both);
}
catch
{
}

this.listenSocket?.Close();
this.listenSocket?.Dispose();
this.listenSocket = null;
}
}
}

0 comments on commit 21ed618

Please sign in to comment.