Skip to content

Commit

Permalink
refactor: applying same changes on netcore
Browse files Browse the repository at this point in the history
The same changes of netfx were replicated on netcore.
Also, ConfigureAwait(false) was added to lower the risk of
deadlock if getAwaiter. This approach is still needed to be
checked
  • Loading branch information
Farenheith committed Mar 4, 2024
1 parent 588ee4a commit 34e7372
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 443 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using Microsoft.Data.Common;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Data.ProviderBase
Expand Down Expand Up @@ -35,8 +35,8 @@ protected internal override DataTable GetSchema(DbConnectionFactory factory, DbC

protected override DbReferenceCollection CreateReferenceCollection() => throw ADP.ClosedConnectionError();

internal override bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
=> base.TryOpenConnectionInternal(outerConnection, connectionFactory, retry, userOptions);
internal override Task<bool> TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
=> base.TryOpenConnectionInternal(outerConnection, connectionFactory, cancellationToken, userOptions);
}

abstract internal class DbConnectionBusy : DbConnectionClosed
Expand All @@ -45,7 +45,7 @@ protected DbConnectionBusy(ConnectionState state) : base(state, true, false)
{
}

internal override bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal override Task<bool> TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
=> throw ADP.ConnectionAlreadyOpen(State);
}

Expand Down Expand Up @@ -84,32 +84,12 @@ internal override void CloseConnection(DbConnection owningObject, DbConnectionFa
connectionFactory.SetInnerConnectionTo(owningObject, DbConnectionClosedPreviouslyOpened.SingletonInstance);
}

internal override bool TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
=> TryOpenConnection(outerConnection, connectionFactory, retry, userOptions);
internal override Task<bool> TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
=> TryOpenConnection(outerConnection, connectionFactory, cancellationToken, userOptions);

internal override bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal override Task<bool> TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
if (retry == null || !retry.Task.IsCompleted)
{
// retry is null if this is a synchronous call

// if someone calls Open or OpenAsync while in this state,
// then the retry task will not be completed

throw ADP.ConnectionAlreadyOpen(State);
}

// we are completing an asynchronous open
Debug.Assert(retry.Task.Status == TaskStatus.RanToCompletion, "retry task must be completed successfully");
DbConnectionInternal openConnection = retry.Task.Result;
if (null == openConnection)
{
connectionFactory.SetInnerConnectionTo(outerConnection, this);
throw ADP.InternalConnectionError(ADP.ConnectionError.GetConnectionReturnsNull);
}
connectionFactory.SetInnerConnectionEvent(outerConnection, openConnection);

return true;
throw ADP.ConnectionAlreadyOpen(State);
}
}

Expand All @@ -134,7 +114,7 @@ private DbConnectionClosedPreviouslyOpened() : base(ConnectionState.Closed, true
{
}

internal override bool TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
=> TryOpenConnection(outerConnection, connectionFactory, retry, userOptions);
internal override Task<bool> TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
=> TryOpenConnection(outerConnection, connectionFactory, cancellationToken, userOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal abstract partial class DbConnectionFactory

// s_pendingOpenNonPooled is an array of tasks used to throttle creation of non-pooled connections to
// a maximum of Environment.ProcessorCount at a time.
private static uint s_pendingOpenNonPooledNext = 0;

private static Task<DbConnectionInternal>[] s_pendingOpenNonPooled = new Task<DbConnectionInternal>[Environment.ProcessorCount];
private static Task<DbConnectionInternal> s_completedTask;

Expand Down Expand Up @@ -96,7 +96,7 @@ internal virtual DbConnectionPoolProviderInfo CreateConnectionPoolProviderInfo(D
}


internal DbConnectionInternal CreateNonPooledConnection(DbConnection owningConnection, DbConnectionPoolGroup poolGroup, DbConnectionOptions userOptions)
internal async Task<DbConnectionInternal> CreateNonPooledConnection(DbConnection owningConnection, DbConnectionPoolGroup poolGroup, DbConnectionOptions userOptions)
{
Debug.Assert(null != owningConnection, "null owningConnection?");
Debug.Assert(null != poolGroup, "null poolGroup?");
Expand All @@ -105,7 +105,7 @@ internal DbConnectionInternal CreateNonPooledConnection(DbConnection owningConne
DbConnectionPoolGroupProviderInfo poolGroupProviderInfo = poolGroup.ProviderInfo;
DbConnectionPoolKey poolKey = poolGroup.PoolKey;

DbConnectionInternal newConnection = CreateConnection(connectionOptions, poolKey, poolGroupProviderInfo, null, owningConnection, userOptions);
DbConnectionInternal newConnection = await CreateConnection(connectionOptions, poolKey, poolGroupProviderInfo, null, owningConnection, userOptions);
if (null != newConnection)
{
SqlClientEventSource.Log.HardConnectRequest();
Expand All @@ -115,12 +115,12 @@ internal DbConnectionInternal CreateNonPooledConnection(DbConnection owningConne
return newConnection;
}

internal DbConnectionInternal CreatePooledConnection(DbConnectionPool pool, DbConnection owningObject, DbConnectionOptions options, DbConnectionPoolKey poolKey, DbConnectionOptions userOptions)
internal async Task<DbConnectionInternal> CreatePooledConnection(DbConnectionPool pool, DbConnection owningObject, DbConnectionOptions options, DbConnectionPoolKey poolKey, DbConnectionOptions userOptions)
{
Debug.Assert(null != pool, "null pool?");
DbConnectionPoolGroupProviderInfo poolGroupProviderInfo = pool.PoolGroup.ProviderInfo;

DbConnectionInternal newConnection = CreateConnection(options, poolKey, poolGroupProviderInfo, pool, owningObject, userOptions);
DbConnectionInternal newConnection = await CreateConnection(options, poolKey, poolGroupProviderInfo, pool, owningObject, userOptions);
if (null != newConnection)
{
SqlClientEventSource.Log.HardConnectRequest();
Expand Down Expand Up @@ -412,7 +412,7 @@ internal void QueuePoolGroupForRelease(DbConnectionPoolGroup poolGroup)
SqlClientEventSource.Log.ExitActiveConnectionPoolGroup();
}

virtual protected DbConnectionInternal CreateConnection(DbConnectionOptions options, DbConnectionPoolKey poolKey, object poolGroupProviderInfo, DbConnectionPool pool, DbConnection owningConnection, DbConnectionOptions userOptions)
virtual protected Task<DbConnectionInternal> CreateConnection(DbConnectionOptions options, DbConnectionPoolKey poolKey, object poolGroupProviderInfo, DbConnectionPool pool, DbConnection owningConnection, DbConnectionOptions userOptions)
{
return CreateConnection(options, poolKey, poolGroupProviderInfo, pool, owningConnection);
}
Expand Down Expand Up @@ -447,7 +447,7 @@ protected virtual DbMetaDataFactory CreateMetaDataFactory(DbConnectionInternal i
throw ADP.NotSupported();
}

abstract protected DbConnectionInternal CreateConnection(DbConnectionOptions options, DbConnectionPoolKey poolKey, object poolGroupProviderInfo, DbConnectionPool pool, DbConnection owningConnection);
abstract protected Task<DbConnectionInternal> CreateConnection(DbConnectionOptions options, DbConnectionPoolKey poolKey, object poolGroupProviderInfo, DbConnectionPool pool, DbConnection owningConnection);

abstract protected DbConnectionOptions CreateConnectionOptions(string connectionString, DbConnectionOptions previous);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,9 @@ internal void NotifyWeakReference(int message)
}
}

internal virtual void OpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory)
internal virtual async Task OpenConnection(CancellationToken cancellationToken, DbConnection outerConnection, DbConnectionFactory connectionFactory)
{
if (!TryOpenConnection(outerConnection, connectionFactory, null, null))
if (!await TryOpenConnection(outerConnection, connectionFactory, cancellationToken, null))
{
throw ADP.InternalError(ADP.InternalErrorCode.SynchronousConnectReturnedPending);
}
Expand All @@ -319,17 +319,17 @@ internal virtual void OpenConnection(DbConnection outerConnection, DbConnectionF
/// override this and do the correct thing.</devdoc>
// User code should either override DbConnectionInternal.Activate when it comes out of the pool
// or override DbConnectionFactory.CreateConnection when the connection is created for non-pooled connections
internal virtual bool TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal virtual Task<bool> TryOpenConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
throw ADP.ConnectionAlreadyOpen(State);
}

internal virtual bool TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
internal virtual Task<bool> TryReplaceConnection(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
throw ADP.MethodNotImplemented();
}

protected bool TryOpenConnectionInternal(DbConnection outerConnection, DbConnectionFactory connectionFactory, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions)
protected async Task<bool> TryOpenConnectionInternal(DbConnection outerConnection, DbConnectionFactory connectionFactory, CancellationToken cancellationToken, DbConnectionOptions userOptions)
{
// ?->Connecting: prevent set_ConnectionString during Open
if (connectionFactory.SetInnerConnectionFrom(outerConnection, DbConnectionClosedConnecting.SingletonInstance, this))
Expand All @@ -338,10 +338,7 @@ protected bool TryOpenConnectionInternal(DbConnection outerConnection, DbConnect
try
{
connectionFactory.PermissionDemand(outerConnection);
if (!connectionFactory.TryGetConnection(outerConnection, retry, userOptions, this, out openConnection))
{
return false;
}
openConnection = await connectionFactory.TryGetConnection(outerConnection, cancellationToken, userOptions, this);
}
catch
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ internal abstract partial class DbConnectionFactory
{
private static readonly Action<Task<DbConnectionInternal>, object> s_tryGetConnectionCompletedContinuation = TryGetConnectionCompletedContinuation;

internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, DbConnectionInternal oldConnection, out DbConnectionInternal connection)
internal async Task<DbConnectionInternal> TryGetConnection(DbConnection owningConnection, CancellationToken cancellationToken, DbConnectionOptions userOptions, DbConnectionInternal oldConnection)
{
Debug.Assert(null != owningConnection, "null owningConnection?");

DbConnectionInternal connection;
DbConnectionPoolGroup poolGroup;
DbConnectionPool connectionPool;
connection = null;

// Work around race condition with clearing the pool between GetConnectionPool obtaining pool
// and GetConnection on the pool checking the pool state. Clearing the pool in this window
Expand All @@ -49,80 +48,19 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
// this connection should not be pooled via DbConnectionPool
// or have a disabled pool entry.
poolGroup = GetConnectionPoolGroup(owningConnection); // previous entry have been disabled

if (retry != null)
{
Task<DbConnectionInternal> newTask;
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
lock (s_pendingOpenNonPooled)
{
// look for an available task slot (completed or empty)
int idx;
for (idx = 0; idx < s_pendingOpenNonPooled.Length; idx++)
{
Task task = s_pendingOpenNonPooled[idx];
if (task == null)
{
s_pendingOpenNonPooled[idx] = GetCompletedTask();
break;
}
else if (task.IsCompleted)
{
break;
}
}

// if didn't find one, pick the next one in round-robin fashion
if (idx == s_pendingOpenNonPooled.Length)
{
idx = (int)(s_pendingOpenNonPooledNext % s_pendingOpenNonPooled.Length);
unchecked
{
s_pendingOpenNonPooledNext++;
}
}

// now that we have an antecedent task, schedule our work when it is completed.
// If it is a new slot or a completed task, this continuation will start right away.
newTask = CreateReplaceConnectionContinuation(s_pendingOpenNonPooled[idx], owningConnection, retry, userOptions, oldConnection, poolGroup, cancellationTokenSource);

// Place this new task in the slot so any future work will be queued behind it
s_pendingOpenNonPooled[idx] = newTask;
}

// Set up the timeout (if needed)
if (owningConnection.ConnectionTimeout > 0)
{
int connectionTimeoutMilliseconds = owningConnection.ConnectionTimeout * 1000;
cancellationTokenSource.CancelAfter(connectionTimeoutMilliseconds);
}

// once the task is done, propagate the final results to the original caller
newTask.ContinueWith(
continuationAction: s_tryGetConnectionCompletedContinuation,
state: Tuple.Create(cancellationTokenSource, retry),
scheduler: TaskScheduler.Default
);

return false;
}

connection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
connection = await CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
SqlClientEventSource.Log.EnterNonPooledConnection();
}
else
{
if (((SqlClient.SqlConnection)owningConnection).ForceNewConnection)
{
Debug.Assert(!(oldConnection is DbConnectionClosed), "Force new connection, but there is no old connection");
connection = connectionPool.ReplaceConnection(owningConnection, userOptions, oldConnection);
connection = await connectionPool.ReplaceConnection(owningConnection, userOptions, oldConnection);
}
else
{
if (!connectionPool.TryGetConnection(owningConnection, retry, userOptions, out connection))
{
return false;
}
connection = await connectionPool.TryGetConnection(owningConnection, cancellationToken, userOptions);
}

if (connection == null)
Expand Down Expand Up @@ -153,35 +91,7 @@ internal bool TryGetConnection(DbConnection owningConnection, TaskCompletionSour
throw ADP.PooledOpenTimeout();
}

return true;
}

private Task<DbConnectionInternal> CreateReplaceConnectionContinuation(Task<DbConnectionInternal> task, DbConnection owningConnection, TaskCompletionSource<DbConnectionInternal> retry, DbConnectionOptions userOptions, DbConnectionInternal oldConnection, DbConnectionPoolGroup poolGroup, CancellationTokenSource cancellationTokenSource)
{
return task.ContinueWith(
(_) =>
{
Transaction originalTransaction = ADP.GetCurrentTransaction();
try
{
ADP.SetCurrentTransaction(retry.Task.AsyncState as Transaction);
var newConnection = CreateNonPooledConnection(owningConnection, poolGroup, userOptions);
if ((oldConnection != null) && (oldConnection.State == ConnectionState.Open))
{
oldConnection.PrepareForReplaceConnection();
oldConnection.Dispose();
}
return newConnection;
}
finally
{
ADP.SetCurrentTransaction(originalTransaction);
}
},
cancellationTokenSource.Token,
TaskContinuationOptions.LongRunning,
TaskScheduler.Default
);
return connection;
}

private static void TryGetConnectionCompletedContinuation(Task<DbConnectionInternal> task, object state)
Expand Down
Loading

0 comments on commit 34e7372

Please sign in to comment.