Skip to content

Commit

Permalink
Kill queries specifying node_id
Browse files Browse the repository at this point in the history
Summary:
Add logging for QueryInterrupted exceptions which aren't covered by the connector.
Get rid of 7.3 and add 8.0 and 8.1 to CI. Compat level for test.

Test Plan: https://app.circleci.com/pipelines/github/memsql/SingleStoreNETConnector/292/workflows/b321683d-4e84-4f02-8ea1-778fe49dd1da

Reviewers: adam, mshcherbina-ua, rodrigo

Reviewed By: mshcherbina-ua

Subscribers: engineering-list

JIRA Issues: PLAT-6674

Differential Revision: https://grizzly.internal.memcompute.com/D63860
  • Loading branch information
pmishchenko-ua committed Jul 24, 2023
1 parent 35d1ed3 commit 46c04a8
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 31 deletions.
18 changes: 12 additions & 6 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ workflows:
version: 2
build_and_test:
jobs:
- test-ubuntu:
name: Test 8.1 cluster-in-a-box
matrix:
parameters:
singlestore_image:
- singlestore/cluster-in-a-box:alma-8.1.12-93bb04ec6c-4.0.13-1.17.0
- test-ubuntu:
name: Test 8.0 cluster-in-a-box
matrix:
parameters:
singlestore_image:
- singlestore/cluster-in-a-box:alma-8.0.19-f48780d261-4.0.11-1.16.0
- test-ubuntu:
name: Test 7.8 cluster-in-a-box
matrix:
Expand All @@ -156,12 +168,6 @@ workflows:
parameters:
singlestore_image:
- singlestore/cluster-in-a-box:centos-7.5.12-3112a491c2-4.0.0-1.12.5
- test-ubuntu:
name: Test 7.3 cluster-in-a-box
matrix:
parameters:
singlestore_image:
- singlestore/cluster-in-a-box:centos-7.3.13-761e3259b3-3.2.11-1.11.9
- test-windows:
name: Test S2MS on Windows
publish:
Expand Down
2 changes: 2 additions & 0 deletions .circleci/setup_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ singlestore-wait-start() {
sleep 0.2
done
mysql -u root -h 127.0.0.1 -P 3306 -p"${SQL_USER_PASSWORD}" -e "create database if not exists singlestoretest" >/dev/null 2>/dev/null
mysql -u root -h 127.0.0.1 -P 3306 -p"${SQL_USER_PASSWORD}" -e "set global data_conversion_compatibility_level='6.0'" >/dev/null 2>/dev/null

echo ". Success!"
}

Expand Down
11 changes: 9 additions & 2 deletions src/SingleStoreConnector/Core/CommandExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ public static async Task<SingleStoreDataReader> ExecuteReaderAsync(IReadOnlyList
try
{
await connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
return await SingleStoreDataReader.CreateAsync(commandListPosition, payloadCreator, cachedProcedures, command, behavior, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
return await SingleStoreDataReader.CreateAsync(commandListPosition, payloadCreator, cachedProcedures,
command, behavior, activity, ioBehavior, cancellationToken).ConfigureAwait(false);
}
catch (SingleStoreException ex) when (ex.ErrorCode == SingleStoreErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
catch (SingleStoreException ex) when (ex.ErrorCode == SingleStoreErrorCode.QueryInterrupted &&
cancellationToken.IsCancellationRequested)
{
Log.Info("Session{0} query was interrupted", connection.Session.Id);
throw new OperationCanceledException(ex.Message, ex, cancellationToken);
Expand All @@ -67,6 +69,11 @@ public static async Task<SingleStoreDataReader> ExecuteReaderAsync(IReadOnlyList
int megabytes = payload.Span.Length / 1_000_000;
throw new SingleStoreException($"Error submitting {megabytes}MB packet; ensure 'max_allowed_packet' is greater than {megabytes}MB.", ex);
}
catch (SingleStoreException ex) when (ex.ErrorCode == SingleStoreErrorCode.QueryInterrupted)
{
Log.Trace("Session{0} got QueryInterrupted exception, but not because of the CommandTimeout or CancellationToken (CommandExecutor.cs)", connection.Session.Id);
throw;
}
}
catch (Exception ex) when (activity is { IsAllDataRequested: true })
{
Expand Down
4 changes: 4 additions & 0 deletions src/SingleStoreConnector/Core/ResultSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics;
using System.Globalization;
using System.Runtime.ExceptionServices;
using SingleStoreConnector.Logging;
using SingleStoreConnector.Protocol;
using SingleStoreConnector.Protocol.Payloads;
using SingleStoreConnector.Protocol.Serialization;
Expand Down Expand Up @@ -250,6 +251,8 @@ public async Task<bool> ReadAsync(IOBehavior ioBehavior, CancellationToken cance
throw new OperationCanceledException(ex.Message, ex, token);
if (ex.ErrorCode == SingleStoreErrorCode.QueryInterrupted && resultSet.Command.CancellableCommand.IsTimedOut)
throw SingleStoreException.CreateForTimeout(ex);
if (ex.ErrorCode == SingleStoreErrorCode.QueryInterrupted)
Log.Trace("Got QueryInterrupted exception, but not because of the CommandTimeout or CancellationToken (ResultSet.cs)");
throw;
}
return ScanRowAsyncRemainder(resultSet, payloadData, row);
Expand Down Expand Up @@ -358,6 +361,7 @@ public Row GetCurrentRow()
return m_row ?? throw new InvalidOperationException("There is no current row.");
}

private static readonly ISingleStoreConnectorLogger Log = SingleStoreConnectorLogManager.CreateLogger(nameof(ResultSet));
public SingleStoreDataReader DataReader { get; }
public ExceptionDispatchInfo? ReadResultSetHeaderException { get; private set; }
public ISingleStoreCommand Command => DataReader.Command!;
Expand Down
52 changes: 42 additions & 10 deletions src/SingleStoreConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public ServerSession(ConnectionPool? pool, int poolGeneration, int id)
// SingleStore Server version
public ServerVersion S2ServerVersion { get; set; }

public int AggregatorId { get; private set; }
public int ActiveCommandId { get; private set; }
public int CancellationTimeout { get; private set; }
public int ConnectionId { get; set; }
Expand All @@ -63,7 +64,8 @@ public ServerSession(ConnectionPool? pool, int poolGeneration, int id)
public uint LastReturnedTicks { get; private set; }
public string? DatabaseOverride { get; set; }
public string HostName { get; private set; }
public IPAddress? IPAddress => (m_tcpClient?.Client.RemoteEndPoint as IPEndPoint)?.Address;
public IPEndPoint? IPEndPoint => m_tcpClient?.Client.RemoteEndPoint as IPEndPoint;
public string? UserID { get; private set; }
public WeakReference<SingleStoreConnection>? OwningConnection { get; set; }
public bool SupportsComMulti => m_supportsComMulti;
public bool SupportsDeprecateEof => m_supportsDeprecateEof;
Expand Down Expand Up @@ -320,8 +322,22 @@ public void FinishQuerying()
Log.Debug("Session{0} sending 'SELECT SLEEP(0) INTO @dummy' command to clear pending cancellation", m_logArguments);
var payload = SupportsQueryAttributes ? s_sleepWithAttributesPayload : s_sleepNoAttributesPayload;
#pragma warning disable CA2012 // Safe because method completes synchronously
SendAsync(payload, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
payload = ReceiveReplyAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
try
{
SendAsync(payload, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
payload = ReceiveReplyAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
}
catch (SingleStoreException ex)
{
if (ex.ErrorCode == SingleStoreErrorCode.QueryInterrupted)
{
Log.Debug("Session{0} cancelled dummy-command to clear pending cancellation", m_logArguments);
}
else
{
Log.Debug("Session{0} failed dummy-command to clear pending cancellation", m_logArguments);
}
}
#pragma warning restore CA2012
OkPayload.Create(payload.Span, SupportsDeprecateEof, SupportsSessionTrack);
}
Expand Down Expand Up @@ -1676,16 +1692,17 @@ private bool ShouldGetRealServerDetails(ConnectionSettings cs)

private async Task GetRealServerDetailsAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
Log.Debug("Session{0} is getting CONNECTION_ID(), VERSION(), S2Version from server", m_logArguments);
Log.Debug("Session{0} is getting CONNECTION_ID(), VERSION(), S2Version, aggregator_id from server", m_logArguments);
try
{
var payload = SupportsQueryAttributes ? s_selectConnectionIdVersionWithAttributesPayload : s_selectConnectionIdVersionNoAttributesPayload;
await SendAsync(payload, ioBehavior, cancellationToken).ConfigureAwait(false);

// column count: 3
// column count: 4
await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

// CONNECTION_ID(), VERSION() and @@memsql_version columns
// CONNECTION_ID(), VERSION(), @@memsql_version and @@aggregator_id columns
await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
Expand All @@ -1698,7 +1715,7 @@ private async Task GetRealServerDetailsAsync(IOBehavior ioBehavior, Cancellation

// first (and only) row
payload = await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
static void ReadRow(ReadOnlySpan<byte> span, out int? connectionId, out byte[] serverVersion, out byte[] s2Version)
static void ReadRow(ReadOnlySpan<byte> span, out int? connectionId, out byte[] serverVersion, out byte[] s2Version, out int? aggregator_id)
{
var reader = new ByteArrayReader(span);
var length = reader.ReadLengthEncodedIntegerOrNull();
Expand All @@ -1713,8 +1730,13 @@ static void ReadRow(ReadOnlySpan<byte> span, out int? connectionId, out byte[] s
#pragma warning disable CA1825 // Avoid zero-length array allocations
s2Version = length != -1 ? reader.ReadByteString(length).ToArray() : new byte[0];
#pragma warning restore CA1825 // Avoid zero-length array allocations

length = reader.ReadLengthEncodedIntegerOrNull();
#pragma warning disable CA1825 // Avoid zero-length array allocations
aggregator_id = (length != -1 && Utf8Parser.TryParse(reader.ReadByteString(length), out int node_id, out _)) ? node_id : default(int?);
#pragma warning restore CA1825 // Avoid zero-length array allocations
}
ReadRow(payload.Span, out var connectionId, out var serverVersion, out var s2Version);
ReadRow(payload.Span, out var connectionId, out var serverVersion, out var s2Version, out var aggregator_id);

// OK/EOF payload
payload = await ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
Expand All @@ -1741,6 +1763,16 @@ static void ReadRow(ReadOnlySpan<byte> span, out int? connectionId, out byte[] s
Log.Debug("Session{0} setting S2ServerVersion to {2}", m_logArguments[0], S2ServerVersion.OriginalString, newS2Version.OriginalString);
S2ServerVersion = newS2Version;
}
if (aggregator_id.HasValue)
{
Log.Debug("Session{0} setting AggregatorId to {2}", m_logArguments[0], aggregator_id.Value);
AggregatorId = aggregator_id.Value;
}
else
{
// dummy value, @@aggregator_id should always be set
AggregatorId = -1;
}
}
catch (SingleStoreException ex)
{
Expand Down Expand Up @@ -1985,8 +2017,8 @@ protected override void OnStatementBegin(int index)
private static readonly PayloadData s_setNamesUtf8mb4WithAttributesPayload = QueryPayload.Create(true, "SET NAMES utf8mb4;"u8);
private static readonly PayloadData s_sleepNoAttributesPayload = QueryPayload.Create(false, "SELECT SLEEP(0) INTO @dummy;"u8);
private static readonly PayloadData s_sleepWithAttributesPayload = QueryPayload.Create(true, "SELECT SLEEP(0) INTO @dummy;"u8);
private static readonly PayloadData s_selectConnectionIdVersionNoAttributesPayload = QueryPayload.Create(false, "SELECT CONNECTION_ID(), VERSION(), @@memsql_version;"u8);
private static readonly PayloadData s_selectConnectionIdVersionWithAttributesPayload = QueryPayload.Create(true, "SELECT CONNECTION_ID(), VERSION(), @@memsql_version;"u8);
private static readonly PayloadData s_selectConnectionIdVersionNoAttributesPayload = QueryPayload.Create(false, "SELECT CONNECTION_ID(), VERSION(), @@memsql_version, @@aggregator_id;"u8);
private static readonly PayloadData s_selectConnectionIdVersionWithAttributesPayload = QueryPayload.Create(true, "SELECT CONNECTION_ID(), VERSION(), @@memsql_version, @@aggregator_id;"u8);
private static int s_lastId;

private readonly object m_lock;
Expand Down
14 changes: 12 additions & 2 deletions src/SingleStoreConnector/SingleStoreConnection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.Net.Security;
using System.Net.Sockets;
using System.Security.Authentication;
Expand Down Expand Up @@ -763,14 +764,23 @@ internal void Cancel(ICancellableCommand command, int commandId, bool isCancel)
AutoEnlist = false,
Pooling = false,
};
if (m_session?.IPAddress is { } ipAddress)
if (m_session.IPEndPoint is { Address: { } ipAddress, Port: { } port } )
{
csb.Server = ipAddress.ToString();
csb.Port = (uint) port;
}
csb.UserID = m_session.UserID;
var cancellationTimeout = GetConnectionSettings().CancellationTimeout;
csb.ConnectionTimeout = cancellationTimeout < 1 ? 3u : (uint) cancellationTimeout;

using var connection = CloneWith(csb.ConnectionString);
connection.Open();
using var killCommand = new SingleStoreCommand("KILL QUERY {0}".FormatInvariant(command.Connection!.ServerThread), connection);
#if NET6_0_OR_GREATER
var killQuerySql = string.Create(CultureInfo.InvariantCulture, $"KILL QUERY {command.Connection!.ServerThread} {m_session.AggregatorId}");
#else
var killQuerySql = FormattableString.Invariant($"KILL QUERY {command.Connection!.ServerThread} {m_session.AggregatorId}");
#endif
using var killCommand = new SingleStoreCommand(killQuerySql, connection);
killCommand.CommandTimeout = cancellationTimeout < 1 ? 3 : cancellationTimeout;
m_session?.DoCancel(command, killCommand);
}
Expand Down
3 changes: 3 additions & 0 deletions src/SingleStoreConnector/SingleStoreDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ private void ActivateResultSet(CancellationToken cancellationToken)
if (mySqlException?.ErrorCode == SingleStoreErrorCode.QueryInterrupted && Command!.CancellableCommand.IsTimedOut)
throw SingleStoreException.CreateForTimeout(mySqlException);

if (mySqlException?.ErrorCode == SingleStoreErrorCode.QueryInterrupted)
Log.Trace("Session{0} got QueryInterrupted exception, but not because of the CommandTimeout or CancellationToken (SingleStoreDataReader.cs)", Command!.Connection!.Session.Id);

if (mySqlException is not null)
{
ServerSession.ThrowIfStatementContainsDelimiter(mySqlException, Command!);
Expand Down
12 changes: 1 addition & 11 deletions tests/SideBySide/TransactionScopeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -653,17 +653,7 @@ public async Task CancelExecuteNonQueryAsync(string connectionString)

using var command = new SingleStoreCommand("SELECT SLEEP(3) INTO @dummy", connection);
using var tokenSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));
var interrupted = false;
try
{
await command.ExecuteNonQueryAsync(tokenSource.Token);
}
catch (OperationCanceledException ex)
{
Assert.Contains("Query execution was interrupted", ex.Message);
interrupted = true;
}
Assert.True(interrupted);
await command.ExecuteNonQueryAsync(tokenSource.Token);
}

[SkippableFact(Skip = "need XA transactions which are not supported in SingleStore", Baseline = "Multiple simultaneous connections or connections with different connection strings inside the same transaction are not currently supported.")]
Expand Down

0 comments on commit 46c04a8

Please sign in to comment.