Skip to content

Commit

Permalink
[Client encryption]: Adds Stream based processor (#4806)
Browse files Browse the repository at this point in the history
# Pull Request Template

## Description

- JsonProcessor.Stream was added to drop DOM overhead.
- EncryptionProcessor has added overloads allowing consumer provision of
output stream
- tests were extended to new processor
- performance tests were extended by provided stream overloads.
RecyclableMemoryStream 3.0.1 is being used.

## Type of change

Please delete options that are not relevant.

- [] New feature (non-breaking change which adds functionality)
- [] This change might require a documentation update

## Closing issues

Contributes to #4678

---------

Co-authored-by: Juraj Blazek <[email protected]>
Co-authored-by: juraj-blazek <[email protected]>
Co-authored-by: Santosh Kulkarni <[email protected]>
Co-authored-by: Kiran Kumar Kolli <[email protected]>
  • Loading branch information
5 people authored Oct 23, 2024
1 parent 89c79a2 commit f1e5c2f
Show file tree
Hide file tree
Showing 18 changed files with 1,177 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ internal static async Task<DecryptionContext> DecryptContentAsync(
{
_ = diagnosticsContext;

if (encryptionProperties.EncryptionFormatVersion != 2)
if (encryptionProperties.EncryptionFormatVersion != EncryptionFormatVersion.AeAes)
{
throw new NotSupportedException($"Unknown encryption format version: {encryptionProperties.EncryptionFormatVersion}. Please upgrade your SDK to the latest version.");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Encryption.Custom
{
internal static class EncryptionFormatVersion
{
public const int AeAes = 2;
public const int Mde = 3;
public const int MdeWithCompression = 4;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@ public enum JsonProcessor
/// </summary>
Newtonsoft,

#if NET8_0_OR_GREATER
#if ENCRYPTION_CUSTOM_PREVIEW && NET8_0_OR_GREATER
/// <summary>
/// System.Text.Json
/// </summary>
/// <remarks>Available with .NET8.0 package only.</remarks>
SystemTextJson,

/// <summary>
/// Ut8JsonReader/Writer
/// </summary>
/// <remarks>Available with .NET8.0 package only.</remarks>
Stream,
#endif
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Microsoft.Azure.Cosmos.Encryption.Custom
{
using System;
using System.Collections.Generic;
using System.Linq;

internal static class EncryptionOptionsExtensions
{
Expand All @@ -31,6 +33,24 @@ internal static void Validate(this EncryptionOptions options)
#pragma warning restore CA2208 // Instantiate argument exceptions correctly
}

if (options.PathsToEncrypt is not HashSet<string> && options.PathsToEncrypt.Distinct().Count() != options.PathsToEncrypt.Count())
{
throw new InvalidOperationException("Duplicate paths in PathsToEncrypt passed via EncryptionOptions.");
}

foreach (string path in options.PathsToEncrypt)
{
if (string.IsNullOrWhiteSpace(path) || path[0] != '/' || path.IndexOf('/', 1) != -1)
{
throw new InvalidOperationException($"Invalid path {path ?? string.Empty}, {nameof(options.PathsToEncrypt)}");
}

if (path.AsSpan(1).Equals("id".AsSpan(), StringComparison.Ordinal))
{
throw new InvalidOperationException($"{nameof(options.PathsToEncrypt)} includes a invalid path: '{path}'.");
}
}

options.CompressionOptions?.Validate();
}

Expand Down
159 changes: 141 additions & 18 deletions Microsoft.Azure.Cosmos.Encryption.Custom/src/EncryptionProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ internal static class EncryptionProcessor

#if ENCRYPTION_CUSTOM_PREVIEW && NET8_0_OR_GREATER
private static readonly JsonWriterOptions JsonWriterOptions = new () { SkipValidation = true };
private static readonly StreamProcessor StreamProcessor = new ();
#endif

private static readonly MdeEncryptionProcessor MdeEncryptionProcessor = new ();
Expand Down Expand Up @@ -62,24 +63,6 @@ public static async Task<Stream> EncryptAsync(
return input;
}

if (encryptionOptions.PathsToEncrypt.Distinct().Count() != encryptionOptions.PathsToEncrypt.Count())
{
throw new InvalidOperationException("Duplicate paths in PathsToEncrypt passed via EncryptionOptions.");
}

foreach (string path in encryptionOptions.PathsToEncrypt)
{
if (string.IsNullOrWhiteSpace(path) || path[0] != '/' || path.IndexOf('/', 1) != -1)
{
throw new InvalidOperationException($"Invalid path {path ?? string.Empty}, {nameof(encryptionOptions.PathsToEncrypt)}");
}

if (path.AsSpan(1).Equals("id".AsSpan(), StringComparison.Ordinal))
{
throw new InvalidOperationException($"{nameof(encryptionOptions.PathsToEncrypt)} includes a invalid path: '{path}'.");
}
}

#pragma warning disable CS0618 // Type or member is obsolete
return encryptionOptions.EncryptionAlgorithm switch
{
Expand All @@ -90,6 +73,42 @@ public static async Task<Stream> EncryptAsync(
#pragma warning restore CS0618 // Type or member is obsolete
}

#if ENCRYPTION_CUSTOM_PREVIEW && NET8_0_OR_GREATER
public static async Task EncryptAsync(
Stream input,
Stream output,
Encryptor encryptor,
EncryptionOptions encryptionOptions,
CosmosDiagnosticsContext diagnosticsContext,
CancellationToken cancellationToken)
{
_ = diagnosticsContext;

ValidateInputForEncrypt(
input,
encryptor,
encryptionOptions);

if (!encryptionOptions.PathsToEncrypt.Any())
{
await input.CopyToAsync(output, cancellationToken);
return;
}

if (encryptionOptions.EncryptionAlgorithm != CosmosEncryptionAlgorithm.MdeAeadAes256CbcHmac256Randomized)
{
throw new NotSupportedException($"Streaming mode is only allowed for {nameof(CosmosEncryptionAlgorithm.MdeAeadAes256CbcHmac256Randomized)}");
}

if (encryptionOptions.JsonProcessor != JsonProcessor.Stream)
{
throw new NotSupportedException($"Streaming mode is only allowed for {nameof(JsonProcessor.Stream)}");
}

await EncryptionProcessor.StreamProcessor.EncryptStreamAsync(input, output, encryptor, encryptionOptions, cancellationToken);
}
#endif

/// <remarks>
/// If there isn't any data that needs to be decrypted, input stream will be returned without any modification.
/// Else input stream will be disposed, and a new stream is returned.
Expand Down Expand Up @@ -140,11 +159,76 @@ public static async Task<Stream> EncryptAsync(
JsonProcessor.Newtonsoft => await DecryptAsync(input, encryptor, diagnosticsContext, cancellationToken),
#if ENCRYPTION_CUSTOM_PREVIEW && NET8_0_OR_GREATER
JsonProcessor.SystemTextJson => await DecryptJsonNodeAsync(input, encryptor, diagnosticsContext, cancellationToken),
JsonProcessor.Stream => await DecryptStreamAsync(input, encryptor, diagnosticsContext, cancellationToken),
#endif
_ => throw new InvalidOperationException("Unsupported Json Processor")
};
}

#if ENCRYPTION_CUSTOM_PREVIEW && NET8_0_OR_GREATER
public static async Task<DecryptionContext> DecryptAsync(
Stream input,
Stream output,
Encryptor encryptor,
CosmosDiagnosticsContext diagnosticsContext,
JsonProcessor jsonProcessor,
CancellationToken cancellationToken)
{
if (input == null)
{
return null;
}

if (jsonProcessor != JsonProcessor.Stream)
{
throw new NotSupportedException($"Streaming mode is only allowed for {nameof(JsonProcessor.Stream)}");
}

Debug.Assert(input.CanSeek);
Debug.Assert(output.CanWrite);
Debug.Assert(output.CanSeek);
Debug.Assert(encryptor != null);
Debug.Assert(diagnosticsContext != null);
input.Position = 0;

EncryptionPropertiesWrapper properties = await System.Text.Json.JsonSerializer.DeserializeAsync<EncryptionPropertiesWrapper>(input, cancellationToken: cancellationToken);
input.Position = 0;
if (properties?.EncryptionProperties == null)
{
await input.CopyToAsync(output, cancellationToken: cancellationToken);
return null;
}

DecryptionContext context;
#pragma warning disable CS0618 // Type or member is obsolete
if (properties.EncryptionProperties.EncryptionAlgorithm == CosmosEncryptionAlgorithm.MdeAeadAes256CbcHmac256Randomized)
{
context = await StreamProcessor.DecryptStreamAsync(input, output, encryptor, properties.EncryptionProperties, diagnosticsContext, cancellationToken);
}
else if (properties.EncryptionProperties.EncryptionAlgorithm == CosmosEncryptionAlgorithm.AEAes256CbcHmacSha256Randomized)
{
(Stream stream, context) = await DecryptAsync(input, encryptor, diagnosticsContext, cancellationToken);
await stream.CopyToAsync(output, cancellationToken);
output.Position = 0;
}
else
{
input.Position = 0;
throw new NotSupportedException($"Encryption Algorithm: {properties.EncryptionProperties.EncryptionAlgorithm} is not supported.");
}
#pragma warning restore CS0618 // Type or member is obsolete

if (context == null)
{
input.Position = 0;
return null;
}

await input.DisposeAsync();
return context;
}
#endif

#if ENCRYPTION_CUSTOM_PREVIEW && NET8_0_OR_GREATER
public static async Task<(Stream, DecryptionContext)> DecryptJsonNodeAsync(
Stream input,
Expand Down Expand Up @@ -182,6 +266,45 @@ public static async Task<Stream> EncryptAsync(
}
#endif

#if ENCRYPTION_CUSTOM_PREVIEW && NET8_0_OR_GREATER
public static async Task<(Stream, DecryptionContext)> DecryptStreamAsync(
Stream input,
Encryptor encryptor,
CosmosDiagnosticsContext diagnosticsContext,
CancellationToken cancellationToken)
{
if (input == null)
{
return (input, null);
}

Debug.Assert(input.CanSeek);
Debug.Assert(encryptor != null);
Debug.Assert(diagnosticsContext != null);
input.Position = 0;

EncryptionPropertiesWrapper properties = await System.Text.Json.JsonSerializer.DeserializeAsync<EncryptionPropertiesWrapper>(input, cancellationToken: cancellationToken);
input.Position = 0;
if (properties?.EncryptionProperties == null)
{
return (input, null);
}

MemoryStream ms = new ();

DecryptionContext context = await StreamProcessor.DecryptStreamAsync(input, ms, encryptor, properties.EncryptionProperties, diagnosticsContext, cancellationToken);
if (context == null)
{
input.Position = 0;
return (input, null);
}

await input.DisposeAsync();
return (ms, context);
}

#endif

public static async Task<(JObject, DecryptionContext)> DecryptAsync(
JObject document,
Encryptor encryptor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Encryption.Custom
{
using System.Text.Json.Serialization;

internal class EncryptionPropertiesWrapper
{
[JsonPropertyName(Constants.EncryptedInfo)]
public EncryptionProperties EncryptionProperties { get; }

public EncryptionPropertiesWrapper(EncryptionProperties encryptionProperties)
{
this.EncryptionProperties = encryptionProperties;
}
}
}
Loading

0 comments on commit f1e5c2f

Please sign in to comment.