Skip to content

Commit

Permalink
Structured Message Decode Stream (Azure#42079)
Browse files Browse the repository at this point in the history
* Initial implementation and basic test

* seek/write tests

* fix test param

* fix exceptions
  • Loading branch information
jaschrep-msft committed Aug 12, 2024
1 parent e6269c7 commit b5b4610
Show file tree
Hide file tree
Showing 7 changed files with 835 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
<Compile Include="$(AzureStorageSharedSources)StorageServerTimeoutPolicy.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StorageTelemetryPolicy.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StorageVersionExtensions.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessage.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessageDecodingStream.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)StructuredMessageEncodingStream.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)UriExtensions.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)UriQueryParamsCollection.cs" LinkBase="Shared" />
<Compile Include="$(AzureStorageSharedSources)UserDelegationKeyProperties.cs" LinkBase="Shared" />
Expand Down
16 changes: 16 additions & 0 deletions sdk/storage/Azure.Storage.Common/src/Shared/Errors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ internal static void VerifyStreamPosition(Stream stream, string streamName)
}
}

internal static void AssertBufferMinimumSize(ReadOnlySpan<byte> buffer, int minSize, string paramName)
{
if (buffer.Length < minSize)
{
throw new ArgumentException($"Expected buffer Length of at least {minSize} bytes. Got {buffer.Length}.", paramName);
}
}

internal static void AssertBufferExactSize(ReadOnlySpan<byte> buffer, int size, string paramName)
{
if (buffer.Length != size)
{
throw new ArgumentException($"Expected buffer Length of exactly {size} bytes. Got {buffer.Length}.", paramName);
}
}

public static void ThrowIfParamNull(object obj, string paramName)
{
if (obj == null)
Expand Down
191 changes: 191 additions & 0 deletions sdk/storage/Azure.Storage.Common/src/Shared/StructuredMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;
using System.Buffers;
using System.Buffers.Binary;
using System.IO;
using System.Security.Cryptography;
using Azure.Core;

namespace Azure.Storage.Shared;

internal static class StructuredMessage
{
public const int Crc64Length = 8;

[Flags]
public enum Flags
{
None = 0,
CrcSegment = 1,
}

public static class V1_0
{
public const byte MessageVersionByte = 1;

public const int StreamHeaderLength = 13;
public const int SegmentHeaderLength = 10;

#region Stream Header
public static void ReadStreamHeader(
ReadOnlySpan<byte> buffer,
out long messageLength,
out Flags flags,
out int totalSegments)
{
Errors.AssertBufferExactSize(buffer, 13, nameof(buffer));
if (buffer[0] != 1)
{
throw new InvalidDataException("Unrecognized version of structured message.");
}
messageLength = (long)BinaryPrimitives.ReadUInt64LittleEndian(buffer.Slice(1, 8));
flags = (Flags)BinaryPrimitives.ReadUInt16LittleEndian(buffer.Slice(9, 2));
totalSegments = BinaryPrimitives.ReadUInt16LittleEndian(buffer.Slice(11, 2));
}

public static int WriteStreamHeader(
Span<byte> buffer,
long messageLength,
Flags flags,
int totalSegments)
{
const int versionOffset = 0;
const int messageLengthOffset = 1;
const int flagsOffset = 9;
const int numSegmentsOffset = 11;

Errors.AssertBufferMinimumSize(buffer, StreamHeaderLength, nameof(buffer));

buffer[versionOffset] = MessageVersionByte;
BinaryPrimitives.WriteUInt64LittleEndian(buffer.Slice(messageLengthOffset, 8), (ulong)messageLength);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.Slice(flagsOffset, 2), (ushort)flags);
BinaryPrimitives.WriteUInt16LittleEndian(buffer.Slice(numSegmentsOffset, 2), (ushort)totalSegments);

return StreamHeaderLength;
}

/// <summary>
/// Gets stream header in a buffer rented from the provided ArrayPool.
/// </summary>
/// <returns>
/// Disposable to return the buffer to the pool.
/// </returns>
public static IDisposable GetStreamHeaderBytes(
ArrayPool<byte> pool,
out Memory<byte> bytes,
long messageLength,
Flags flags,
int totalSegments)
{
Argument.AssertNotNull(pool, nameof(pool));
IDisposable disposable = pool.RentAsMemoryDisposable(StreamHeaderLength, out bytes);
WriteStreamHeader(bytes.Span, messageLength, flags, totalSegments);
return disposable;
}
#endregion

// no stream footer content in 1.0

#region SegmentHeader
public static void ReadSegmentHeader(
ReadOnlySpan<byte> buffer,
out int segmentNum,
out long contentLength)
{
Errors.AssertBufferExactSize(buffer, 10, nameof(buffer));
segmentNum = BinaryPrimitives.ReadUInt16LittleEndian(buffer.Slice(0, 2));
contentLength = (long)BinaryPrimitives.ReadUInt64LittleEndian(buffer.Slice(2, 8));
}

public static int WriteSegmentHeader(Span<byte> buffer, int segmentNum, long segmentLength)
{
const int segmentNumOffset = 0;
const int segmentLengthOffset = 2;

Errors.AssertBufferMinimumSize(buffer, SegmentHeaderLength, nameof(buffer));

BinaryPrimitives.WriteUInt16LittleEndian(buffer.Slice(segmentNumOffset, 2), (ushort)segmentNum);
BinaryPrimitives.WriteUInt64LittleEndian(buffer.Slice(segmentLengthOffset, 8), (ulong)segmentLength);

return SegmentHeaderLength;
}

/// <summary>
/// Gets segment header in a buffer rented from the provided ArrayPool.
/// </summary>
/// <returns>
/// Disposable to return the buffer to the pool.
/// </returns>
public static IDisposable GetSegmentHeaderBytes(
ArrayPool<byte> pool,
out Memory<byte> bytes,
int segmentNum,
long segmentLength)
{
Argument.AssertNotNull(pool, nameof(pool));
IDisposable disposable = pool.RentAsMemoryDisposable(SegmentHeaderLength, out bytes);
WriteSegmentHeader(bytes.Span, segmentNum, segmentLength);
return disposable;
}
#endregion

#region SegmentFooter
public static void ReadSegmentFooter(
ReadOnlySpan<byte> buffer,
Span<byte> crc64 = default)
{
int expectedBufferSize = 0;
if (!crc64.IsEmpty)
{
Errors.AssertBufferExactSize(crc64, Crc64Length, nameof(crc64));
expectedBufferSize += Crc64Length;
}
Errors.AssertBufferExactSize(buffer, expectedBufferSize, nameof(buffer));

if (!crc64.IsEmpty)
{
buffer.Slice(0, Crc64Length).CopyTo(crc64);
}
}

public static int WriteSegmentFooter(Span<byte> buffer, ReadOnlySpan<byte> crc64 = default)
{
int requiredSpace = 0;
if (!crc64.IsEmpty)
{
Errors.AssertBufferExactSize(crc64, Crc64Length, nameof(crc64));
requiredSpace += Crc64Length;
}

Errors.AssertBufferMinimumSize(buffer, requiredSpace, nameof(buffer));
int offset = 0;
if (!crc64.IsEmpty)
{
crc64.CopyTo(buffer.Slice(offset, Crc64Length));
offset += Crc64Length;
}

return offset;
}

/// <summary>
/// Gets stream header in a buffer rented from the provided ArrayPool.
/// </summary>
/// <returns>
/// Disposable to return the buffer to the pool.
/// </returns>
public static IDisposable GetSegmentFooterBytes(
ArrayPool<byte> pool,
out Memory<byte> bytes,
ReadOnlySpan<byte> crc64 = default)
{
Argument.AssertNotNull(pool, nameof(pool));
IDisposable disposable = pool.RentAsMemoryDisposable(StreamHeaderLength, out bytes);
WriteSegmentFooter(bytes.Span, crc64);
return disposable;
}
#endregion
}
}
Loading

0 comments on commit b5b4610

Please sign in to comment.