Skip to content

Commit

Permalink
fix: Making open operation async first
Browse files Browse the repository at this point in the history
Making the open operation async first simplifies the OpenAsync logic
and allows to use the same logic for sync Open, through GetAwaiter
  • Loading branch information
Farenheith committed Mar 3, 2024
1 parent ced726a commit a896f6b
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 407 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Microsoft.Data.ProviderBase
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Data.Common;
using SysTx = System.Transactions;
Expand Down Expand Up @@ -66,9 +67,9 @@ protected override DbReferenceCollection CreateReferenceCollection()
throw ADP.ClosedConnectionError();
}

internal override bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal override Task<bool> TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
return base.TryOpenConnectionInternal(outerConnection, connectionFactory, retry, userOptions);
return base.TryOpenConnectionInternal(outerConnection, connectionFactory, cancellationToken, userOptions);
}
}

Expand All @@ -78,11 +79,6 @@ abstract internal class DbConnectionBusy : DbConnectionClosed
protected DbConnectionBusy(ConnectionState state) : base(state, true, false)
{
}

internal override bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
{
throw ADP.ConnectionAlreadyOpen(State);
}
}

sealed internal class DbConnectionClosedBusy : DbConnectionBusy
Expand Down Expand Up @@ -120,35 +116,9 @@ internal override void CloseConnection(DbConnection owningObject, DbConnectionFa
connectionFactory.SetInnerConnectionTo(owningObject, DbConnectionClosedPreviouslyOpened.SingletonInstance);
}

internal override bool TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
{
return TryOpenConnection(outerConnection, connectionFactory, retry, userOptions);
}

internal override bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal override Task<bool> TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{

if (retry == null || !retry.Task.IsCompleted)
{
// retry is null if this is a synchronous call

// if someone calls Open or OpenAsync while in this state,
// then the retry task will not be completed

throw ADP.ConnectionAlreadyOpen(State);
}

// we are completing an asynchronous open
Debug.Assert(retry.Task.Status == TaskStatus.RanToCompletion, "retry task must be completed successfully");
DbConnectionInternal openConnection = retry.Task.Result;
if (null == openConnection)
{
connectionFactory.SetInnerConnectionTo(outerConnection, this);
throw ADP.InternalConnectionError(ADP.ConnectionError.GetConnectionReturnsNull);
}
connectionFactory.SetInnerConnectionEvent(outerConnection, openConnection);

return true;
return TryOpenConnection(outerConnection, connectionFactory, cancellationToken, userOptions);
}
}

Expand All @@ -172,9 +142,9 @@ private DbConnectionClosedPreviouslyOpened() : base(ConnectionState.Closed, true
{
}

internal override bool TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal override Task<bool> TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
return TryOpenConnection(outerConnection, connectionFactory, retry, userOptions);
return TryOpenConnection(outerConnection, connectionFactory, cancellationToken, userOptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ internal abstract class DbConnectionFactory

// s_pendingOpenNonPooled is an array of tasks used to throttle creation of non-pooled connections to
// a maximum of Environment.ProcessorCount at a time.
static int s_pendingOpenNonPooledNext = 0;
static Task<DbConnectionInternal>[] s_pendingOpenNonPooled = new Task<DbConnectionInternal>[Environment.ProcessorCount];
static Task<DbConnectionInternal> s_completedTask;

Expand Down Expand Up @@ -194,13 +193,13 @@ static Task<DbConnectionInternal> GetCompletedTask()
return s_completedTask;
}

internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, DbConnectionInternal oldConnection, out DbConnectionInternal connection)
internal async Task<(bool, DbConnectionInternal)> TryGetConnection(DbConnection owningConnection, CancellationToken cancellationToken, DbConnectionOptions userOptions, DbConnectionInternal oldConnection)
{
DbConnectionInternal connection;
Debug.Assert(null != owningConnection, "null owningConnection?");

DbConnectionPoolGroup poolGroup;
DbConnectionPool connectionPool;
connection = null;

// SQLBU 431251:
// Work around race condition with clearing the pool between GetConnectionPool obtaining pool
Expand All @@ -226,104 +225,6 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
// this connection should not be pooled via DbConnectionPool
// or have a disabled pool entry.
poolGroup = GetConnectionPoolGroup(owningConnection); // previous entry have been disabled

if (retry != null)
{
Task<DbConnectionInternal> newTask;
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
lock (s_pendingOpenNonPooled)
{

// look for an available task slot (completed or empty)
int idx;
for (idx = 0; idx < s_pendingOpenNonPooled.Length; idx++)
{
Task task = s_pendingOpenNonPooled[idx];
if (task == null)
{
s_pendingOpenNonPooled[idx] = GetCompletedTask();
break;
}
else if (task.IsCompleted)
{
break;
}
}

// if didn't find one, pick the next one in round-robbin fashion
if (idx == s_pendingOpenNonPooled.Length)
{
idx = s_pendingOpenNonPooledNext++ % s_pendingOpenNonPooled.Length;
}

// now that we have an antecedent task, schedule our work when it is completed.
// If it is a new slot or a completed task, this continuation will start right away.
// BUG? : If we have timed out task on top of running task, then new task could be started
// on top of that, since we are only checking the top task. This will lead to starting more threads
// than intended.
newTask = s_pendingOpenNonPooled[idx].ContinueWith((_) =>
{
System.Transactions.Transaction originalTransaction = ADP.GetCurrentTransaction();
try
{
ADP.SetCurrentTransaction(retry.Task.AsyncState as System.Transactions.Transaction);
var newConnection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
if ((oldConnection != null) && (oldConnection.State == ConnectionState.Open))
{
oldConnection.PrepareForReplaceConnection();
oldConnection.Dispose();
}
return newConnection;
}
finally
{
ADP.SetCurrentTransaction(originalTransaction);
}
}, cancellationTokenSource.Token, TaskContinuationOptions.LongRunning, TaskScheduler.Default);

// Place this new task in the slot so any future work will be queued behind it
s_pendingOpenNonPooled[idx] = newTask;
}

// Set up the timeout (if needed)
if (owningConnection.ConnectionTimeout > 0)
{
int connectionTimeoutMilliseconds = owningConnection.ConnectionTimeout * 1000;
cancellationTokenSource.CancelAfter(connectionTimeoutMilliseconds);
}

// once the task is done, propagate the final results to the original caller
newTask.ContinueWith((task) =>
{
cancellationTokenSource.Dispose();
if (task.IsCanceled)
{
retry.TrySetException(ADP.ExceptionWithStackTrace(ADP.NonPooledOpenTimeout()));
}
else if (task.IsFaulted)
{
retry.TrySetException(task.Exception.InnerException);
}
else
{
if (retry.TrySetResult(task.Result))
{
PerformanceCounters.NumberOfNonPooledConnections.Increment();
}
else
{
// The outer TaskCompletionSource was already completed
// Which means that we don't know if someone has messed with the outer connection in the middle of creation
// So the best thing to do now is to destroy the newly created connection
task.Result.DoomThisConnection();
task.Result.Dispose();
}
}
}, TaskScheduler.Default);

return false;
}

connection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
PerformanceCounters.NumberOfNonPooledConnections.Increment();
}
Expand All @@ -336,9 +237,9 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
}
else
{
if (!connectionPool.TryGetConnection(owningConnection, retry, userOptions, out connection))
if (!connectionPool.TryGetConnection(owningConnection, userOptions, out connection))
{
return false;
return (false, connection);
}
}

Expand All @@ -357,7 +258,8 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
// We've hit the race condition, where the pool was shut down after we got it from the group.
// Yield time slice to allow shut down activities to complete and a new, running pool to be instantiated
// before retrying.
System.Threading.Thread.Sleep(timeBetweenRetriesMilliseconds);
await Task.Delay(timeBetweenRetriesMilliseconds);
if (cancellationToken.IsCancellationRequested) throw new OperationCanceledException();
timeBetweenRetriesMilliseconds *= 2; // double the wait time for next iteration
}
}
Expand All @@ -372,7 +274,7 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
throw ADP.PooledOpenTimeout();
}

return true;
return (true, connection);
}

private DbConnectionPool GetConnectionPool(DbConnection owningObject, DbConnectionPoolGroup connectionPoolGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,42 +725,33 @@ internal void NotifyWeakReference(int message)
}
}

internal virtual void OpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory)
{
if (!TryOpenConnection(outerConnection, connectionFactory, null, null))
{
throw ADP.InternalError(ADP.InternalErrorCode.SynchronousConnectReturnedPending);
}
}

/// <devdoc>The default implementation is for the open connection objects, and
/// it simply throws. Our private closed-state connection objects
/// override this and do the correct thing.</devdoc>
// User code should either override DbConnectionInternal.Activate when it comes out of the pool
// or override DbConnectionFactory.CreateConnection when the connection is created for non-pooled connections
internal virtual bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal virtual Task<bool> TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
throw ADP.ConnectionAlreadyOpen(State);
}

internal virtual bool TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal virtual Task<bool> TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
throw ADP.MethodNotImplemented("TryReplaceConnection");
}

protected bool TryOpenConnectionInternal(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
protected async Task<bool> TryOpenConnectionInternal(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
// ?->Connecting: prevent set_ConnectionString during Open
if (connectionFactory.SetInnerConnectionFrom(outerConnection, DbConnectionClosedConnecting.SingletonInstance, this))
{
DbConnectionInternal openConnection = null;
DbConnectionInternal openConnection;
try
{
connectionFactory.PermissionDemand(outerConnection);
if (!connectionFactory.TryGetConnection(outerConnection, retry, userOptions, this, out openConnection))
{
return false;
}
bool result;
(result, openConnection) = await connectionFactory.TryGetConnection(outerConnection, cancellationToken, userOptions, this);
if (!result) return false;
}
catch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1275,22 +1275,15 @@ void WaitForPendingOpen()
} while (_pendingOpens.TryPeek(out next));
}

internal bool TryGetConnection(DbConnection owningObject, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, out DbConnectionInternal connection)
internal bool TryGetConnection(DbConnection owningObject, DbConnectionOptions userOptions, out DbConnectionInternal connection)
{

uint waitForMultipleObjectsTimeout = 0;
bool allowCreate = false;
var waitForMultipleObjectsTimeout = (uint)CreationTimeout;
var allowCreate = true;

if (retry == null)
{
waitForMultipleObjectsTimeout = (uint)CreationTimeout;

// VSTFDEVDIV 445531: set the wait timeout to INFINITE (-1) if the SQL connection timeout is 0 (== infinite)
if (waitForMultipleObjectsTimeout == 0)
waitForMultipleObjectsTimeout = unchecked((uint)Timeout.Infinite);

allowCreate = true;
}
// VSTFDEVDIV 445531: set the wait timeout to INFINITE (-1) if the SQL connection timeout is 0 (== infinite)
if (waitForMultipleObjectsTimeout == 0)
waitForMultipleObjectsTimeout = unchecked((uint)Timeout.Infinite);

if (_state != State.Running)
{
Expand All @@ -1304,30 +1297,8 @@ internal bool TryGetConnection(DbConnection owningObject, TaskCompletionSource<D
{
return true;
}
else if (retry == null)
{
// timed out on a sync call
return true;
}

var pendingGetConnection =
new PendingGetConnection(
CreationTimeout == 0 ? Timeout.Infinite : ADP.TimerCurrent() + ADP.TimerFromSeconds(CreationTimeout / 1000),
owningObject,
retry,
userOptions);
_pendingOpens.Enqueue(pendingGetConnection);

// it is better to StartNew too many times than not enough
if (_pendingOpensWaiting == 0)
{
Thread waitOpenThread = new Thread(WaitForPendingOpen);
waitOpenThread.IsBackground = true;
waitOpenThread.Start();
}

connection = null;
return false;
// timed out on a sync call
return true;
}

[SuppressMessage("Microsoft.Reliability", "CA2001:AvoidCallingProblematicMethods")] // copied from Triaged.cs
Expand Down
Loading

0 comments on commit a896f6b

Please sign in to comment.