Skip to content

Commit

Permalink
Pull 2.1.3 - 2.1.5 releases
Browse files Browse the repository at this point in the history
Summary:
This diff grabs changes from original connector that were made in 2.1.3, 2.1.4 and 2.1.5 releases:
- Add better FormatException message for Guid types
- Update StyleCop version
- Update Microsoft.NET.Test.Sdk to 17.0.0
- Loop to read all data when decompressing. Fixes mysql-net#1120
- Eliminate unnecessary allocations for constant payloads
- Fix extra indentation (ConnectionTests.cs, DataAdapterTests.cs, SslTests.cs)
- Speed up inserts with MySqlDataAdapter. Fixes mysql-net#1124
- Include "special" bytes in blob insert test to ensure that these values are escaped properly

Test Plan: https://app.circleci.com/pipelines/github/memsql/SingleStoreNETConnector/239/workflows/1facf76f-076a-47d5-86d6-2cb4d8a887a9

Reviewers: pmishchenko-ua

Reviewed By: pmishchenko-ua

Subscribers: engineering-list

JIRA Issues: PLAT-6408

Differential Revision: https://grizzly.internal.memcompute.com/D61592
  • Loading branch information
okramarenko committed Mar 22, 2023
1 parent cc02b45 commit 6d1d8ac
Show file tree
Hide file tree
Showing 13 changed files with 636 additions and 375 deletions.
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>-->
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.354">
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.376">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
Expand Down
4 changes: 2 additions & 2 deletions src/SingleStoreConnector/Core/BinaryRow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ protected override object GetValueCore(ReadOnlySpan<byte> data, ColumnDefinition
ProtocolUtility.GetBytesPerCharacter(columnDefinition.CharacterSet);

if (Connection.GuidFormat == SingleStoreGuidFormat.Char36 && columnLen == 36)
return Utf8Parser.TryParse(data, out Guid guid, out int guid36BytesConsumed, 'D') && guid36BytesConsumed == 36 ? guid : throw new FormatException();
return Utf8Parser.TryParse(data, out Guid guid, out int guid36BytesConsumed, 'D') && guid36BytesConsumed == 36 ? guid : throw new FormatException("Could not parse CHAR(36) value as Guid: {0}".FormatInvariant(Encoding.UTF8.GetString(data)));
if (Connection.GuidFormat == SingleStoreGuidFormat.Char32 && columnLen == 32)
return Utf8Parser.TryParse(data, out Guid guid, out int guid32BytesConsumed, 'N') && guid32BytesConsumed == 32 ? guid : throw new FormatException();
return Utf8Parser.TryParse(data, out Guid guid, out int guid32BytesConsumed, 'N') && guid32BytesConsumed == 32 ? guid : throw new FormatException("Could not parse CHAR(32) value as Guid: {0}".FormatInvariant(Encoding.UTF8.GetString(data)));
if (Connection.TreatChar48AsGeographyPoint && columnLen == 48)
goto case ColumnType.GeographyPoint;
if (columnLen == 1431655765)
Expand Down
10 changes: 7 additions & 3 deletions src/SingleStoreConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public void FinishQuerying()
// In order to handle this case, we issue a dummy query that will consume the pending cancellation.
// See https://bugs.mysql.com/bug.php?id=45679
Log.Debug("Session{0} sending 'SELECT SLEEP(0) INTO @dummy' command to clear pending cancellation", m_logArguments);
var payload = QueryPayload.Create(SupportsQueryAttributes, "SELECT SLEEP(0) INTO @dummy;");
var payload = SupportsQueryAttributes ? s_sleepWithAttributesPayload : s_sleepNoAttributesPayload;
#pragma warning disable CA2012 // Safe because method completes synchronously
SendAsync(payload, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
payload = ReceiveReplyAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
Expand Down Expand Up @@ -1621,7 +1621,8 @@ private async Task GetRealServerDetailsAsync(IOBehavior ioBehavior, Cancellation
Log.Debug("Session{0} is getting CONNECTION_ID(), VERSION(), S2Version from server", m_logArguments);
try
{
await SendAsync(QueryPayload.Create(SupportsQueryAttributes, "SELECT CONNECTION_ID(), VERSION(), @@memsql_version;"), ioBehavior, cancellationToken).ConfigureAwait(false);
var payload = SupportsQueryAttributes ? s_selectConnectionIdVersionWithAttributesPayload : s_selectConnectionIdVersionNoAttributesPayload;
await SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);

// column count: 3
await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
Expand All @@ -1631,7 +1632,6 @@ private async Task GetRealServerDetailsAsync(IOBehavior ioBehavior, Cancellation
await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);

PayloadData payload;
if (!SupportsDeprecateEof)
{
payload = await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
Expand Down Expand Up @@ -1916,6 +1916,10 @@ protected override void OnStatementBegin(int index)
static readonly PayloadData s_setNamesUtf8mb4NoAttributesPayload = QueryPayload.Create(false, "SET NAMES utf8mb4;");
static readonly PayloadData s_setNamesUtf8WithAttributesPayload = QueryPayload.Create(true, "SET NAMES utf8;");
static readonly PayloadData s_setNamesUtf8mb4WithAttributesPayload = QueryPayload.Create(true, "SET NAMES utf8mb4;");
static readonly PayloadData s_sleepNoAttributesPayload = QueryPayload.Create(false, "SELECT SLEEP(0) INTO @dummy;");
static readonly PayloadData s_sleepWithAttributesPayload = QueryPayload.Create(true, "SELECT SLEEP(0) INTO @dummy;");
static readonly PayloadData s_selectConnectionIdVersionNoAttributesPayload = QueryPayload.Create(false, "SELECT CONNECTION_ID(), VERSION(), @@memsql_version;");
static readonly PayloadData s_selectConnectionIdVersionWithAttributesPayload = QueryPayload.Create(true, "SELECT CONNECTION_ID(), VERSION(), @@memsql_version;");
static int s_lastId;

readonly object m_lock;
Expand Down
4 changes: 2 additions & 2 deletions src/SingleStoreConnector/Core/TextRow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ protected override object GetValueCore(ReadOnlySpan<byte> data, ColumnDefinition
var columnLen = columnDefinition.ColumnLength /
ProtocolUtility.GetBytesPerCharacter(columnDefinition.CharacterSet);
if (Connection.GuidFormat == SingleStoreGuidFormat.Char36 && columnLen == 36)
return Utf8Parser.TryParse(data, out Guid guid, out int guid36BytesConsumed, 'D') && guid36BytesConsumed == 36 ? guid : throw new FormatException();
return Utf8Parser.TryParse(data, out Guid guid, out int guid36BytesConsumed, 'D') && guid36BytesConsumed == 36 ? guid : throw new FormatException("Could not parse CHAR(36) value as Guid: {0}".FormatInvariant(Encoding.UTF8.GetString(data)));
if (Connection.GuidFormat == SingleStoreGuidFormat.Char32 && columnLen == 32)
return Utf8Parser.TryParse(data, out Guid guid, out int guid32BytesConsumed, 'N') && guid32BytesConsumed == 32 ? guid : throw new FormatException();
return Utf8Parser.TryParse(data, out Guid guid, out int guid32BytesConsumed, 'N') && guid32BytesConsumed == 32 ? guid : throw new FormatException("Could not parse CHAR(32) value as Guid: {0}".FormatInvariant(Encoding.UTF8.GetString(data)));
if (Connection.TreatChar48AsGeographyPoint && columnLen == 48)
goto case ColumnType.GeographyPoint;
if (columnLen == 1431655765)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public static InitialHandshakePayload Create(ReadOnlySpan<byte> span)
var authPluginDataLength = reader.ReadByte();
reader.Offset += 6;

long extendedCapabilites = reader.ReadInt32();
long extendedCapabilities = reader.ReadInt32();
if ((protocolCapabilities & ProtocolCapabilities.LongPassword) == 0)
{
// MariaDB clears the CLIENT_LONG_PASSWORD flag to indicate it's not a MySQL Server
protocolCapabilities |= (ProtocolCapabilities) (extendedCapabilites << 32);
protocolCapabilities |= (ProtocolCapabilities) (extendedCapabilities << 32);
}

if ((protocolCapabilities & ProtocolCapabilities.SecureConnection) != 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,15 @@ private ValueTask<int> ReadBytesAsync(Memory<byte> buffer, ProtocolErrorBehavior
var uncompressedData = new byte[uncompressedLength];
using var compressedStream = new MemoryStream(payloadReadBytes.Array!, payloadReadBytes.Offset, payloadReadBytes.Count);
using var decompressingStream = new ZLibStream(compressedStream, CompressionMode.Decompress);
var bytesRead = decompressingStream.Read(uncompressedData, 0, uncompressedLength);
m_remainingData = new(uncompressedData, 0, bytesRead);
int bytesRead, totalBytesRead = 0;
do
{
bytesRead = decompressingStream.Read(uncompressedData, totalBytesRead, uncompressedLength - totalBytesRead);
totalBytesRead += bytesRead;
} while (bytesRead > 0);
if (totalBytesRead != uncompressedLength && protocolErrorBehavior == ProtocolErrorBehavior.Throw)
return ValueTaskExtensions.FromException<int>(new InvalidOperationException("Expected to read {0:n0} uncompressed bytes but only read {1:n0}".FormatInvariant(uncompressedLength, totalBytesRead)));
m_remainingData = new(uncompressedData, 0, totalBytesRead);
#else
// check CMF (Compression Method and Flags) and FLG (Flags) bytes for expected values
var cmf = payloadReadBytes.Array![payloadReadBytes.Offset];
Expand All @@ -151,10 +158,17 @@ private ValueTask<int> ReadBytesAsync(Memory<byte> buffer, ProtocolErrorBehavior
var uncompressedData = new byte[uncompressedLength];
using var compressedStream = new MemoryStream(payloadReadBytes.Array, payloadReadBytes.Offset + headerSize, payloadReadBytes.Count - headerSize - checksumSize);
using var decompressingStream = new DeflateStream(compressedStream, CompressionMode.Decompress);
var bytesRead = decompressingStream.Read(uncompressedData, 0, uncompressedLength);
m_remainingData = new(uncompressedData, 0, bytesRead);
int bytesRead, totalBytesRead = 0;
do
{
bytesRead = decompressingStream.Read(uncompressedData, totalBytesRead, uncompressedLength - totalBytesRead);
totalBytesRead += bytesRead;
} while (bytesRead > 0);
if (totalBytesRead != uncompressedLength && protocolErrorBehavior == ProtocolErrorBehavior.Throw)
return ValueTaskExtensions.FromException<int>(new InvalidOperationException("Expected to read {0:n0} uncompressed bytes but only read {1:n0}".FormatInvariant(uncompressedLength, totalBytesRead)));
m_remainingData = new(uncompressedData, 0, totalBytesRead);

var checksum = Adler32.Calculate(uncompressedData, 0, (uint)bytesRead);
var checksum = Adler32.Calculate(uncompressedData.AsSpan(0, totalBytesRead));

var adlerStartOffset = payloadReadBytes.Offset + payloadReadBytes.Count - 4;
if (payloadReadBytes.Array[adlerStartOffset + 0] != ((checksum >> 24) & 0xFF) ||
Expand Down Expand Up @@ -204,7 +218,7 @@ private ValueTask<int> CompressAndWrite(ArraySegment<byte> remainingUncompressed
using (var deflateStream = new DeflateStream(compressedStream, CompressionLevel.Optimal, leaveOpen: true))
deflateStream.Write(remainingUncompressedData.Array!, remainingUncompressedData.Offset, remainingUncompressedBytes);

var checksum = Adler32.Calculate(remainingUncompressedData.Array!, (uint)remainingUncompressedData.Offset, (uint)remainingUncompressedBytes);
var checksum = Adler32.Calculate(remainingUncompressedData.AsSpan(0, remainingUncompressedBytes));
compressedStream.WriteByte((byte) ((checksum >> 24) & 0xFF));
compressedStream.WriteByte((byte) ((checksum >> 16) & 0xFF));
compressedStream.WriteByte((byte) ((checksum >> 8) & 0xFF));
Expand Down
134 changes: 133 additions & 1 deletion src/SingleStoreConnector/SingleStoreDataAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
using System.Globalization;
using System.Text;
using System.Text.RegularExpressions;
using SingleStoreConnector.Core;

namespace SingleStoreConnector;

public sealed class SingleStoreDataAdapter : DbDataAdapter
Expand Down Expand Up @@ -96,7 +101,134 @@ protected override int AddToBatch(IDbCommand command)

protected override void ClearBatch() => m_batch!.BatchCommands.Clear();

protected override int ExecuteBatch() => m_batch!.ExecuteNonQuery();
protected override int ExecuteBatch()
{
if (TryConvertToCommand(m_batch!) is SingleStoreCommand command)
{
command.Connection = m_batch!.Connection;
command.Transaction = m_batch.Transaction;
return command.ExecuteNonQuery();
}
else
{
return m_batch!.ExecuteNonQuery();
}
}

// Detects if the commands in 'batch' are all "INSERT" commands that can be combined into one large value list;
// returns a SingleStoreCommand with the combined SQL if so.
internal static SingleStoreCommand? TryConvertToCommand(SingleStoreBatch batch)
{
// ensure there are at least two commands
if (batch.BatchCommands.Count < 1)
return null;

// check for a parameterized command
var firstCommand = batch.BatchCommands[0];
if (firstCommand.Parameters.Count == 0)
return null;
firstCommand.Batch = batch;

// check that all commands have the same SQL
var sql = firstCommand.CommandText;
for (var i = 1; i < batch.BatchCommands.Count; i++)
{
if (batch.BatchCommands[i].CommandText != sql)
return null;
}

// check that it's an INSERT statement
if (!sql.StartsWith("INSERT INTO ", StringComparison.OrdinalIgnoreCase))
return null;

// check for "VALUES(...)" clause
var match = Regex.Match(sql, @"\bVALUES\s*\([^)]+\)\s*;?\s*$", RegexOptions.Singleline | RegexOptions.IgnoreCase | RegexOptions.CultureInvariant);
if (!match.Success)
return null;

// extract the parameters
var parser = new InsertSqlParser(firstCommand);
parser.Parse(sql);

// record the parameter indexes that were found
foreach (var parameterIndex in parser.ParameterIndexes)
{
if (parameterIndex < 0 || parameterIndex >= firstCommand.Parameters.Count)
return null;
}

// ensure that the VALUES(...) clause contained only parameters, and that all were consumed
var remainingValues = parser.CommandText.Substring(match.Index + 6).Trim();
remainingValues = remainingValues.TrimEnd(';').Trim().TrimStart('(').TrimEnd(')');
remainingValues = remainingValues.Replace(",", "");
if (!string.IsNullOrWhiteSpace(remainingValues))
return null;

// build one INSERT statement with concatenated VALUES
var combinedCommand = new SingleStoreCommand();
var sqlBuilder = new StringBuilder(sql.Substring(0, match.Index + 6));
var combinedParameterIndex = 0;
for (var i = 0; i < batch.BatchCommands.Count; i++)
{
var command = batch.BatchCommands[i];
if (i != 0)
sqlBuilder.Append(',');
sqlBuilder.Append('(');

for (var parameterIndex = 0; parameterIndex < parser.ParameterIndexes.Count; parameterIndex++)
{
if (parameterIndex != 0)
sqlBuilder.Append(',');
var parameterName = "@p" + combinedParameterIndex.ToString(CultureInfo.InvariantCulture);
sqlBuilder.Append(parameterName);
combinedParameterIndex++;
var parameter = command.Parameters[parser.ParameterIndexes[parameterIndex]].Clone();
parameter.ParameterName = parameterName;
combinedCommand.Parameters.Add(parameter);
}

sqlBuilder.Append(')');
}
sqlBuilder.Append(';');

combinedCommand.CommandText = sqlBuilder.ToString();
return combinedCommand;
}

internal sealed class InsertSqlParser : SqlParser
{
public InsertSqlParser(ISingleStoreCommand command)
: base(new StatementPreparer(command.CommandText!, null, command.CreateStatementPreparerOptions()))
{
CommandText = command.CommandText!;
m_parameters = command.RawParameters;
ParameterIndexes = new();
}

public List<int> ParameterIndexes { get; }

public string CommandText { get; private set; }

protected override void OnNamedParameter(int index, int length)
{
var name = CommandText.Substring(index, length);
var parameterIndex = m_parameters?.NormalizedIndexOf(name) ?? -1;
ParameterIndexes.Add(parameterIndex);

// overwrite the parameter name with spaces
CommandText = CommandText.Substring(0, index) + new string(' ', length) + CommandText.Substring(index + length);
}

protected override void OnPositionalParameter(int index)
{
ParameterIndexes.Add(ParameterIndexes.Count);

// overwrite the parameter placeholder with a space
CommandText = CommandText.Substring(0, index) + " " + CommandText.Substring(index + 1);
}

readonly SingleStoreParameterCollection? m_parameters;
}

SingleStoreBatch? m_batch;
}
Expand Down
26 changes: 15 additions & 11 deletions src/SingleStoreConnector/Utilities/Adler32.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright (c) Six Labors.
// Licensed under the Apache License, Version 2.0.
// https://github.com/SixLabors/ImageSharp/blob/master/src/ImageSharp/Formats/Png/Zlib/Adler32.cs
// https://github.com/SixLabors/ImageSharp/blob/master/src/ImageSharp/Compression/Zlib/Adler32.cs

#if !NET6_0_OR_GREATER
using System.Runtime.CompilerServices;
#if NETCOREAPP3_0_OR_GREATER
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;
Expand Down Expand Up @@ -44,36 +45,37 @@ internal static class Adler32
/// Calculates the Adler32 checksum with the bytes taken from the span.
/// </summary>
/// <param name="buffer">The readonly span of bytes.</param>
/// <param name="offset">The offset.</param>
/// <param name="length">The length.</param>
/// <returns>The <see cref="uint"/>.</returns>
public static uint Calculate(ReadOnlySpan<byte> buffer, uint offset, uint length)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static uint Calculate(ReadOnlySpan<byte> buffer)
{
if (buffer.Length == 0)
if (buffer.IsEmpty)
{
return SeedValue;
}

#if NETCOREAPP3_0_OR_GREATER
if (Ssse3.IsSupported && buffer.Length >= MinBufferSize)
{
return CalculateSse(buffer, offset, length);
return CalculateSse(buffer);
}
#endif

return CalculateScalar(buffer, offset, length);
return CalculateScalar(buffer);
}

#if NETCOREAPP3_0_OR_GREATER
// Based on https://github.com/chromium/chromium/blob/master/third_party/zlib/adler32_simd.c
private static unsafe uint CalculateSse(ReadOnlySpan<byte> buffer, uint offset, uint length)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static unsafe uint CalculateSse(ReadOnlySpan<byte> buffer)
{
uint s1 = SeedValue & 0xFFFF;
uint s2 = (SeedValue >> 16) & 0xFFFF;

// Process the data in blocks.
const int BLOCK_SIZE = 1 << 5;

uint length = (uint)buffer.Length;
uint blocks = length / BLOCK_SIZE;
length -= blocks * BLOCK_SIZE;

Expand All @@ -82,7 +84,7 @@ private static unsafe uint CalculateSse(ReadOnlySpan<byte> buffer, uint offset,
fixed (byte* tapPtr = Tap1Tap2)
{
index += (int)blocks * BLOCK_SIZE;
var localBufferPtr = bufferPtr + offset;
var localBufferPtr = bufferPtr;

// _mm_setr_epi8 on x86
Vector128<sbyte> tap1 = Sse2.LoadVector128((sbyte*)tapPtr);
Expand Down Expand Up @@ -192,15 +194,17 @@ private static unsafe uint CalculateSse(ReadOnlySpan<byte> buffer, uint offset,
}
#endif

private static unsafe uint CalculateScalar(ReadOnlySpan<byte> buffer, uint offset, uint length)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static unsafe uint CalculateScalar(ReadOnlySpan<byte> buffer)
{
uint s1 = SeedValue & 0xFFFF;
uint s2 = (SeedValue >> 16) & 0xFFFF;
uint k;

fixed (byte* bufferPtr = buffer)
{
var localBufferPtr = bufferPtr + offset;
var localBufferPtr = bufferPtr;
uint length = (uint) buffer.Length;

while (length > 0)
{
Expand Down
Loading

0 comments on commit 6d1d8ac

Please sign in to comment.