Skip to content

Commit

Permalink
Content validation update spec (Azure#42191)
Browse files Browse the repository at this point in the history
* enum rename and footer read/write methods

* align encode/decode tests | update encoding stream

* decode stream footer

* rename
  • Loading branch information
jaschrep-msft committed Aug 12, 2024
1 parent b5b4610 commit fc13962
Show file tree
Hide file tree
Showing 7 changed files with 1,006 additions and 19 deletions.
60 changes: 57 additions & 3 deletions sdk/storage/Azure.Storage.Common/src/Shared/StructuredMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Buffers;
using System.Buffers.Binary;
using System.IO;
using System.Security.Cryptography;
using Azure.Core;

namespace Azure.Storage.Shared;
Expand All @@ -18,7 +17,7 @@ internal static class StructuredMessage
public enum Flags
{
None = 0,
CrcSegment = 1,
StorageCrc64 = 1,
}

public static class V1_0
Expand Down Expand Up @@ -86,7 +85,62 @@ public static IDisposable GetStreamHeaderBytes(
}
#endregion

// no stream footer content in 1.0
#region StreamFooter
public static void ReadStreamFooter(
ReadOnlySpan<byte> buffer,
Span<byte> crc64 = default)
{
int expectedBufferSize = 0;
if (!crc64.IsEmpty)
{
Errors.AssertBufferExactSize(crc64, Crc64Length, nameof(crc64));
expectedBufferSize += Crc64Length;
}
Errors.AssertBufferExactSize(buffer, expectedBufferSize, nameof(buffer));

if (!crc64.IsEmpty)
{
buffer.Slice(0, Crc64Length).CopyTo(crc64);
}
}

public static int WriteStreamFooter(Span<byte> buffer, ReadOnlySpan<byte> crc64 = default)
{
int requiredSpace = 0;
if (!crc64.IsEmpty)
{
Errors.AssertBufferExactSize(crc64, Crc64Length, nameof(crc64));
requiredSpace += Crc64Length;
}

Errors.AssertBufferMinimumSize(buffer, requiredSpace, nameof(buffer));
int offset = 0;
if (!crc64.IsEmpty)
{
crc64.CopyTo(buffer.Slice(offset, Crc64Length));
offset += Crc64Length;
}

return offset;
}

/// <summary>
/// Gets stream header in a buffer rented from the provided ArrayPool.
/// </summary>
/// <returns>
/// Disposable to return the buffer to the pool.
/// </returns>
public static IDisposable GetStreamFooterBytes(
ArrayPool<byte> pool,
out Memory<byte> bytes,
ReadOnlySpan<byte> crc64 = default)
{
Argument.AssertNotNull(pool, nameof(pool));
IDisposable disposable = pool.RentAsMemoryDisposable(StreamHeaderLength, out bytes);
WriteStreamFooter(bytes.Span, crc64);
return disposable;
}
#endregion

#region SegmentHeader
public static void ReadSegmentHeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal class StructuredMessageDecodingStream : Stream
private int _metadataBufferLength = 0;

private int _streamHeaderLength;
// private readonly int _streamFooterLength; // no stream footer in 1.0
private int _streamFooterLength;
private int _segmentHeaderLength;
private int _segmentFooterLength;
private int _totalSegments;
Expand Down Expand Up @@ -170,7 +170,7 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buf, CancellationTok
private long CurrentRegionLength => _currentRegion switch
{
SMRegion.StreamHeader => _streamHeaderLength,
SMRegion.StreamFooter => 0,
SMRegion.StreamFooter => _streamFooterLength,
SMRegion.SegmentHeader => _segmentHeaderLength,
SMRegion.SegmentFooter => _segmentFooterLength,
SMRegion.SegmentContent => _currentSegmentContentLength,
Expand Down Expand Up @@ -229,6 +229,7 @@ private int Decode(Span<byte> buffer)
SMRegion.SegmentFooter => ProcessSegmentFooter(buffer.Slice(bufferConsumed)),
_ => 0,
};
// TODO surface error if processed is 0
gaps.Add((bufferConsumed, processed));
bufferConsumed += processed;
}
Expand Down Expand Up @@ -335,9 +336,10 @@ private int ProcessStreamHeader(ReadOnlySpan<byte> span)
out _innerStreamLength,
out _flags,
out _totalSegments);
if (_flags.HasFlag(StructuredMessage.Flags.CrcSegment))
if (_flags.HasFlag(StructuredMessage.Flags.StorageCrc64))
{
_segmentFooterLength = _flags.HasFlag(StructuredMessage.Flags.CrcSegment) ? StructuredMessage.Crc64Length : 0;
_segmentFooterLength = _flags.HasFlag(StructuredMessage.Flags.StorageCrc64) ? StructuredMessage.Crc64Length : 0;
_streamFooterLength = _flags.HasFlag(StructuredMessage.Flags.StorageCrc64) ? StructuredMessage.Crc64Length : 0;
_segmentCrc = StorageCrc64HashAlgorithm.Create();
_totalContentCrc = StorageCrc64HashAlgorithm.Create();
}
Expand All @@ -347,7 +349,21 @@ private int ProcessStreamHeader(ReadOnlySpan<byte> span)

private int ProcessStreamFooter(ReadOnlySpan<byte> span)
{
return 0;
int totalProcessed = 0;
if (_flags.HasFlag(StructuredMessage.Flags.StorageCrc64))
{
totalProcessed += StructuredMessage.Crc64Length;
using (ArrayPool<byte>.Shared.RentAsSpanDisposable(StructuredMessage.Crc64Length, out Span<byte> calculated))
{
_totalContentCrc.GetCurrentHash(calculated);
ReadOnlySpan<byte> expected = span.Slice(0, StructuredMessage.Crc64Length);
if (!calculated.SequenceEqual(expected))
{
throw Errors.ChecksumMismatch(calculated, expected);
}
}
}
return totalProcessed;
}

private int ProcessSegmentHeader(ReadOnlySpan<byte> span)
Expand All @@ -369,7 +385,7 @@ private int ProcessSegmentHeader(ReadOnlySpan<byte> span)
private int ProcessSegmentFooter(ReadOnlySpan<byte> span)
{
int totalProcessed = 0;
if (_flags.HasFlag(StructuredMessage.Flags.CrcSegment))
if (_flags.HasFlag(StructuredMessage.Flags.StorageCrc64))
{
totalProcessed += StructuredMessage.Crc64Length;
using (ArrayPool<byte>.Shared.RentAsSpanDisposable(StructuredMessage.Crc64Length, out Span<byte> calculated))
Expand Down
Loading

0 comments on commit fc13962

Please sign in to comment.