Skip to content

Commit

Permalink
Improve CommandTimeout
Browse files Browse the repository at this point in the history
Summary:
This diff contains some of the changes from the original connector (from 2.3 beta releases):
- Improving CommandTimeout
- OpenTcpSocketAsync/GetHostAddressesAsync: use new cancellation token overloads

Test Plan: https://app.circleci.com/pipelines/github/memsql/SingleStoreNETConnector/307/workflows/a768ba13-5e1e-49b3-a0a2-f7b66f840913

Reviewers: pmishchenko-ua

Reviewed By: pmishchenko-ua

Subscribers: engineering-list

JIRA Issues: PLAT-6674

Differential Revision: https://grizzly.internal.memcompute.com/D64552
  • Loading branch information
okramarenko committed Sep 12, 2023
1 parent ec495d0 commit 41245c2
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 46 deletions.
74 changes: 42 additions & 32 deletions src/SingleStoreConnector/Core/ICancellableCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ internal interface ICancellableCommand
{
int CommandId { get; }
int CommandTimeout { get; }
int? EffectiveCommandTimeout { get; set; }
int CancelAttemptCount { get; set; }
SingleStoreConnection? Connection { get; }
IDisposable? RegisterCancel(CancellationToken cancellationToken);
CancellationTokenRegistration RegisterCancel(CancellationToken cancellationToken);
void SetTimeout(int milliseconds);
bool IsTimedOut { get; }
}
Expand All @@ -23,22 +24,6 @@ internal static class ICancellableCommandExtensions
/// </summary>
public static int GetNextId() => Interlocked.Increment(ref s_id);

/// <summary>
/// Returns the time (in seconds) until a command should be canceled, clamping it to the maximum time
/// allowed including CancellationTimeout.
/// </summary>
public static int GetCommandTimeUntilCanceled(this ICancellableCommand command)
{
var commandTimeout = command.CommandTimeout;
var session = command.Connection?.Session;
if (commandTimeout == 0 || session is null)
return 0;

// the total cancellation period (graphically) is [===CommandTimeout===][=CancellationTimeout=], which can't
// exceed int.MaxValue/1000 because it has to be multiplied by 1000 to be converted to milliseconds
return Math.Min(commandTimeout, Math.Max(1, (int.MaxValue / 1000) - session.CancellationTimeout));
}

/// <summary>
/// Causes the effective command timeout to be reset back to the value specified by <see cref="ICancellableCommand.CommandTimeout"/>
/// plus <see cref="SingleStoreConnectionStringBuilder.CancellationTimeout"/>. This allows for the command to time out, a cancellation to attempt
Expand All @@ -53,28 +38,53 @@ public static int GetCommandTimeUntilCanceled(this ICancellableCommand command)
/// method call.</remarks>
public static void ResetCommandTimeout(this ICancellableCommand command)
{
// read value cached on the command
var effectiveCommandTimeout = command.EffectiveCommandTimeout;

// early out if there is no timeout
if (effectiveCommandTimeout == Constants.InfiniteTimeout)
return;

var session = command.Connection?.Session;
if (session is not null)
if (session is null)
return;

// determine the effective command timeout if not already cached
if (effectiveCommandTimeout is null)
{
if (command.CommandTimeout == 0 || session.CancellationTimeout == 0)
var commandTimeout = command.CommandTimeout;
var cancellationTimeout = session.CancellationTimeout;

if (commandTimeout == 0 || cancellationTimeout == 0)
{
session.SetTimeout(Constants.InfiniteTimeout);
// if commandTimeout is zero, then cancellation doesn't occur
effectiveCommandTimeout = Constants.InfiniteTimeout;
}
else
{
var commandTimeUntilCanceled = command.GetCommandTimeUntilCanceled() * 1000;
if (session.CancellationTimeout > 0)
{
// try to cancel first, then close socket
command.SetTimeout(commandTimeUntilCanceled);
session.SetTimeout(commandTimeUntilCanceled + session.CancellationTimeout * 1000);
}
else
{
// close socket once the timeout is reached
session.SetTimeout(commandTimeUntilCanceled);
}
// the total cancellation period (graphically) is [===CommandTimeout===][=CancellationTimeout=], which can't
// exceed int.MaxValue/1000 because it has to be multiplied by 1000 to be converted to milliseconds
effectiveCommandTimeout = Math.Min(commandTimeout, Math.Max(1, (int.MaxValue / 1000) - Math.Max(0, session.CancellationTimeout))) * 1000;
}

command.EffectiveCommandTimeout = effectiveCommandTimeout;
}

if (effectiveCommandTimeout == Constants.InfiniteTimeout)
{
// for no timeout, we set an infinite timeout once (then early out above)
session.SetTimeout(Constants.InfiniteTimeout);
}
else if (session.CancellationTimeout > 0)
{
// try to cancel first, then close socket
command.SetTimeout(effectiveCommandTimeout.Value);
session.SetTimeout(effectiveCommandTimeout.Value + (session.CancellationTimeout * 1000));
}
else
{
// close socket once the timeout is reached
session.SetTimeout(effectiveCommandTimeout.Value);
}
}

Expand Down
1 change: 0 additions & 1 deletion src/SingleStoreConnector/Core/ResultSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ public async Task<bool> ReadAsync(IOBehavior ioBehavior, CancellationToken cance
if (BufferState is ResultSetState.HasMoreData or ResultSetState.NoMoreData or ResultSetState.None)
return new ValueTask<Row?>(default(Row?));

using var registration = Command.CancellableCommand.RegisterCancel(cancellationToken);
var payloadValueTask = Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None);
return payloadValueTask.IsCompletedSuccessfully
? new ValueTask<Row?>(ScanRowAsyncRemainder(this, payloadValueTask.Result, row))
Expand Down
4 changes: 4 additions & 0 deletions src/SingleStoreConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1104,7 +1104,11 @@ private async Task<bool> OpenTcpSocketAsync(ConnectionSettings cs, ILoadBalancer
{
if (ioBehavior == IOBehavior.Asynchronous)
{
#if NET5_0_OR_GREATER
await tcpClient.ConnectAsync(ipAddress, cs.Port, cancellationToken).ConfigureAwait(false);
#else
await tcpClient.ConnectAsync(ipAddress, cs.Port).ConfigureAwait(false);
#endif
}
else
{
Expand Down
28 changes: 19 additions & 9 deletions src/SingleStoreConnector/SingleStoreBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
private DbDataReader ExecuteDbDataReader(CommandBehavior behavior)
#endif
{
((ICancellableCommand) this).ResetCommandTimeout();
this.ResetCommandTimeout();
return ExecuteReaderAsync(behavior, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
}

Expand All @@ -143,7 +143,7 @@ protected override async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBeha
private async Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
#endif
{
((ICancellableCommand) this).ResetCommandTimeout();
this.ResetCommandTimeout();
using var registration = ((ICancellableCommand) this).RegisterCancel(cancellationToken);
return await ExecuteReaderAsync(behavior, AsyncIOBehavior, cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -191,11 +191,19 @@ public Task<int> ExecuteNonQueryAsync(CancellationToken cancellationToken = defa
#endif
ExecuteScalarAsync(AsyncIOBehavior, cancellationToken);

public
#if NET6_0_OR_GREATER
public override int Timeout { get; set; }
#else
public int Timeout { get; set; }
override
#endif
int Timeout
{
get => m_timeout;
set
{
m_timeout = value;
((ICancellableCommand) this).EffectiveCommandTimeout = null;
}
}

#if NET6_0_OR_GREATER
public override void Prepare()
Expand Down Expand Up @@ -247,12 +255,13 @@ public void Dispose()

int ICancellableCommand.CommandId => m_commandId;
int ICancellableCommand.CommandTimeout => Timeout;
int? ICancellableCommand.EffectiveCommandTimeout { get; set; }
int ICancellableCommand.CancelAttemptCount { get; set; }

IDisposable? ICancellableCommand.RegisterCancel(CancellationToken cancellationToken)
CancellationTokenRegistration ICancellableCommand.RegisterCancel(CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled)
return null;
return default;

m_cancelAction ??= Cancel;
return cancellationToken.Register(m_cancelAction);
Expand Down Expand Up @@ -282,7 +291,7 @@ private void CancelCommandForTimeout()

private async Task<int> ExecuteNonQueryAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
((ICancellableCommand) this).ResetCommandTimeout();
this.ResetCommandTimeout();
using var registration = ((ICancellableCommand) this).RegisterCancel(cancellationToken);
using var reader = await ExecuteReaderAsync(CommandBehavior.Default, ioBehavior, cancellationToken).ConfigureAwait(false);
do
Expand All @@ -296,7 +305,7 @@ private async Task<int> ExecuteNonQueryAsync(IOBehavior ioBehavior, Cancellation

private async Task<object?> ExecuteScalarAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
((ICancellableCommand) this).ResetCommandTimeout();
this.ResetCommandTimeout();
using var registration = ((ICancellableCommand) this).RegisterCancel(cancellationToken);
var hasSetResult = false;
object? result = null;
Expand Down Expand Up @@ -402,6 +411,7 @@ private bool IsPrepared

private readonly int m_commandId;
private bool m_isDisposed;
private int m_timeout;
private Action? m_cancelAction;
private Action? m_cancelForCommandTimeoutAction;
private uint m_cancelTimerId;
Expand Down
15 changes: 11 additions & 4 deletions src/SingleStoreConnector/SingleStoreCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ private SingleStoreCommand(SingleStoreCommand other)
{
GC.SuppressFinalize(this);
m_commandTimeout = other.m_commandTimeout;
((ICancellableCommand) this).EffectiveCommandTimeout = null;
m_commandType = other.m_commandType;
DesignTimeVisible = other.DesignTimeVisible;
UpdatedRowSource = other.UpdatedRowSource;
Expand Down Expand Up @@ -226,8 +227,12 @@ public override string CommandText
public override int CommandTimeout
{
get => Math.Min(m_commandTimeout ?? Connection?.DefaultCommandTimeout ?? 0, int.MaxValue / 1000);
set => m_commandTimeout = value >= 0 ? value : throw new ArgumentOutOfRangeException(nameof(value), "CommandTimeout must be greater than or equal to zero.");
}
set
{
m_commandTimeout = value >= 0 ? value : throw new ArgumentOutOfRangeException(nameof(value), "CommandTimeout must be greater than or equal to zero.");
((ICancellableCommand) this).EffectiveCommandTimeout = null;
}
}

/// <inheritdoc/>
public override CommandType CommandType
Expand Down Expand Up @@ -385,10 +390,10 @@ public Task DisposeAsync()
/// <returns>An object that must be disposed to revoke the cancellation registration.</returns>
/// <remarks>This method is more efficient than calling <code>token.Register(Command.Cancel)</code> because it avoids
/// unnecessary allocations.</remarks>
IDisposable? ICancellableCommand.RegisterCancel(CancellationToken cancellationToken)
CancellationTokenRegistration ICancellableCommand.RegisterCancel(CancellationToken cancellationToken)
{
if (!cancellationToken.CanBeCanceled)
return null;
return default;

m_cancelAction ??= Cancel;
return cancellationToken.Register(m_cancelAction);
Expand All @@ -410,6 +415,8 @@ void ICancellableCommand.SetTimeout(int milliseconds)

int ICancellableCommand.CommandId => m_commandId;

int? ICancellableCommand.EffectiveCommandTimeout { get; set; }

int ICancellableCommand.CancelAttemptCount { get; set; }

ICancellableCommand ISingleStoreCommand.CancellableCommand => this;
Expand Down

0 comments on commit 41245c2

Please sign in to comment.