Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-34636: [C#] Reduce allocations when using ArrayPool #39166

Merged
merged 1 commit into from
Dec 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 15 additions & 25 deletions csharp/src/Apache.Arrow/Extensions/ArrayPoolExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,36 @@
using System;
using System.Buffers;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;

namespace Apache.Arrow
{
internal static class ArrayPoolExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void RentReturn(this ArrayPool<byte> pool, int length, Action<Memory<byte>> action)
public static ArrayLease RentReturn(this ArrayPool<byte> pool, int length, out Memory<byte> buffer)
{
byte[] array = null;

try
{
array = pool.Rent(length);
action(array.AsMemory(0, length));
}
finally
{
if (array != null)
{
pool.Return(array);
}
}
byte[] array = pool.Rent(length);
buffer = array.AsMemory(0, length);
return new ArrayLease(pool, array);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static async ValueTask RentReturnAsync(this ArrayPool<byte> pool, int length, Func<Memory<byte>, ValueTask> action)
internal struct ArrayLease : IDisposable
{
byte[] array = null;
private readonly ArrayPool<byte> _pool;
private byte[] _array;

try
public ArrayLease(ArrayPool<byte> pool, byte[] array)
{
array = pool.Rent(length);
await action(array.AsMemory(0, length));
_pool = pool;
_array = array;
}
finally

public void Dispose()
{
if (array != null)
if (_array != null)
{
pool.Return(array);
_pool.Return(_array);
_array = null;
}
}
}
Expand Down
46 changes: 23 additions & 23 deletions csharp/src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,47 +42,47 @@ public ArrowFileReaderImplementation(Stream stream, MemoryAllocator allocator, I
{
}

public async ValueTask<int> RecordBatchCountAsync()
public async ValueTask<int> RecordBatchCountAsync(CancellationToken cancellationToken = default)
{
if (!HasReadSchema)
{
await ReadSchemaAsync().ConfigureAwait(false);
await ReadSchemaAsync(cancellationToken).ConfigureAwait(false);
}

return _footer.RecordBatchCount;
}

protected override async ValueTask ReadSchemaAsync()
protected override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken = default)
{
if (HasReadSchema)
{
return;
}

await ValidateFileAsync().ConfigureAwait(false);
await ValidateFileAsync(cancellationToken).ConfigureAwait(false);

int footerLength = 0;
await ArrayPool<byte>.Shared.RentReturnAsync(4, async (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(4, out Memory<byte> buffer))
{
BaseStream.Position = GetFooterLengthPosition();

int bytesRead = await BaseStream.ReadFullBufferAsync(buffer).ConfigureAwait(false);
int bytesRead = await BaseStream.ReadFullBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
EnsureFullRead(buffer, bytesRead);

footerLength = ReadFooterLength(buffer);
}).ConfigureAwait(false);
}

await ArrayPool<byte>.Shared.RentReturnAsync(footerLength, async (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(footerLength, out Memory<byte> buffer))
{
long footerStartPosition = GetFooterLengthPosition() - footerLength;

BaseStream.Position = footerStartPosition;

int bytesRead = await BaseStream.ReadFullBufferAsync(buffer).ConfigureAwait(false);
int bytesRead = await BaseStream.ReadFullBufferAsync(buffer, cancellationToken).ConfigureAwait(false);
EnsureFullRead(buffer, bytesRead);

ReadSchema(buffer);
}).ConfigureAwait(false);
}
}

protected override void ReadSchema()
Expand All @@ -95,17 +95,17 @@ protected override void ReadSchema()
ValidateFile();

int footerLength = 0;
ArrayPool<byte>.Shared.RentReturn(4, (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(4, out Memory<byte> buffer))
{
BaseStream.Position = GetFooterLengthPosition();

int bytesRead = BaseStream.ReadFullBuffer(buffer);
EnsureFullRead(buffer, bytesRead);

footerLength = ReadFooterLength(buffer);
});
}

ArrayPool<byte>.Shared.RentReturn(footerLength, (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(footerLength, out Memory<byte> buffer))
{
long footerStartPosition = GetFooterLengthPosition() - footerLength;

Expand All @@ -115,7 +115,7 @@ protected override void ReadSchema()
EnsureFullRead(buffer, bytesRead);

ReadSchema(buffer);
});
}
}

private long GetFooterLengthPosition()
Expand Down Expand Up @@ -239,14 +239,14 @@ private void ReadDictionaries()
/// <summary>
/// Check if file format is valid. If it's valid don't run the validation again.
/// </summary>
private async ValueTask ValidateFileAsync()
private async ValueTask ValidateFileAsync(CancellationToken cancellationToken = default)
{
if (IsFileValid)
{
return;
}

await ValidateMagicAsync().ConfigureAwait(false);
await ValidateMagicAsync(cancellationToken).ConfigureAwait(false);

IsFileValid = true;
}
Expand All @@ -266,31 +266,31 @@ private void ValidateFile()
IsFileValid = true;
}

private async ValueTask ValidateMagicAsync()
private async ValueTask ValidateMagicAsync(CancellationToken cancellationToken = default)
{
long startingPosition = BaseStream.Position;
int magicLength = ArrowFileConstants.Magic.Length;

try
{
await ArrayPool<byte>.Shared.RentReturnAsync(magicLength, async (buffer) =>
using (ArrayPool<byte>.Shared.RentReturn(magicLength, out Memory<byte> buffer))
{
// Seek to the beginning of the stream
BaseStream.Position = 0;

// Read beginning of stream
await BaseStream.ReadAsync(buffer).ConfigureAwait(false);
await BaseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);

VerifyMagic(buffer);

// Move stream position to magic-length bytes away from the end of the stream
BaseStream.Position = BaseStream.Length - magicLength;

// Read the end of the stream
await BaseStream.ReadAsync(buffer).ConfigureAwait(false);
await BaseStream.ReadAsync(buffer, cancellationToken).ConfigureAwait(false);

VerifyMagic(buffer);
}).ConfigureAwait(false);
}
}
finally
{
Expand All @@ -305,7 +305,7 @@ private void ValidateMagic()

try
{
ArrayPool<byte>.Shared.RentReturn(magicLength, buffer =>
using (ArrayPool<byte>.Shared.RentReturn(magicLength, out Memory<byte> buffer))
{
// Seek to the beginning of the stream
BaseStream.Position = 0;
Expand All @@ -322,7 +322,7 @@ private void ValidateMagic()
BaseStream.Read(buffer);

VerifyMagic(buffer);
});
}
}
finally
{
Expand Down
8 changes: 4 additions & 4 deletions csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private void WriteFooter(Schema schema)

// Write footer length

Buffers.RentReturn(4, (buffer) =>
using (Buffers.RentReturn(4, out Memory<byte> buffer))
{
int footerLength;
checked
Expand All @@ -226,7 +226,7 @@ private void WriteFooter(Schema schema)
BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength);

BaseStream.Write(buffer);
});
}

// Write magic

Expand Down Expand Up @@ -286,7 +286,7 @@ private async Task WriteFooterAsync(Schema schema, CancellationToken cancellatio

cancellationToken.ThrowIfCancellationRequested();

await Buffers.RentReturnAsync(4, async (buffer) =>
using (Buffers.RentReturn(4, out Memory<byte> buffer))
{
int footerLength;
checked
Expand All @@ -297,7 +297,7 @@ await Buffers.RentReturnAsync(4, async (buffer) =>
BinaryPrimitives.WriteInt32LittleEndian(buffer.Span, footerLength);

await BaseStream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
}).ConfigureAwait(false);
}

// Write magic

Expand Down
Loading
Loading