Skip to content

Commit

Permalink
Exception handling merge
Browse files Browse the repository at this point in the history
Merge exception handling for SqlBulkCopy, SqlCommandBuilder, SqlDataReader, SqlInternalConnection, SqlTransaction and (partially) SqlConnection
  • Loading branch information
edwardneal committed Feb 24, 2025
1 parent 3d652d4 commit 178d06f
Show file tree
Hide file tree
Showing 10 changed files with 792 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1969,8 +1969,10 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok
_parserLock = internalConnection._parserLock;
_parserLock.Wait(canReleaseFromAnyThread: _isAsyncBulkCopy);

TdsParser bestEffortCleanupTarget = null;
try
{
bestEffortCleanupTarget = SqlInternalConnection.GetBestEffortCleanupTarget(_connection);
WriteRowSourceToServerCommon(columnCount); // This is common in both sync and async
Task resultTask = WriteToServerInternalAsync(ctoken); // resultTask is null for sync, but Task for async.
if (resultTask != null)
Expand Down Expand Up @@ -2018,6 +2020,7 @@ private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctok
catch (System.Threading.ThreadAbortException e)
{
_connection.Abort(e);
SqlInternalConnection.BestEffortCleanup(bestEffortCleanupTarget);
throw;
}
finally
Expand Down Expand Up @@ -2278,7 +2281,8 @@ private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> sour
{
source.SetResult(null);
}
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
}

Expand Down Expand Up @@ -2415,8 +2419,8 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
resultTask = source.Task;

AsyncHelper.ContinueTaskWithState(readTask, source, this,
onSuccess: (object state) => ((SqlBulkCopy)state).CopyRowsAsync(i + 1, totalRows, cts, source)

onSuccess: (object state) => ((SqlBulkCopy)state).CopyRowsAsync(i + 1, totalRows, cts, source),
connectionToDoom: _connection.GetOpenTdsConnection()
);
return resultTask; // Associated task will be completed when all rows are copied to server/exception/cancelled.
}
Expand All @@ -2440,10 +2444,12 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts,
else
{
AsyncHelper.ContinueTaskWithState(readTask, source, sqlBulkCopy,
onSuccess: (object state2) => ((SqlBulkCopy)state2).CopyRowsAsync(i + 1, totalRows, cts, source)
onSuccess: (object state2) => ((SqlBulkCopy)state2).CopyRowsAsync(i + 1, totalRows, cts, source),
connectionToDoom: _connection.GetOpenTdsConnection()
);
}
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
return resultTask;
}
Expand Down Expand Up @@ -2523,7 +2529,8 @@ private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string up
// Continuation finished sync, recall into CopyBatchesAsync to continue
sqlBulkCopy.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
return source.Task;
}
Expand Down Expand Up @@ -2590,7 +2597,8 @@ private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults,
}
},
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false),
onCancellation: static (object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: true)
onCancellation: static (object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: true),
connectionToDoom: _connection.GetOpenTdsConnection()
);

return source.Task;
Expand Down Expand Up @@ -2656,7 +2664,8 @@ private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internal
// Always call back into CopyBatchesAsync
sqlBulkCopy.CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
},
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false)
onFailure: static (Exception _, object state) => ((SqlBulkCopy)state).CopyBatchesAsyncContinuedOnError(cleanupParser: false),
connectionToDoom: _connection.GetOpenTdsConnection()
);
return source.Task;
}
Expand Down Expand Up @@ -2819,7 +2828,8 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int
}
}
}
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
return;
}
Expand Down Expand Up @@ -2938,6 +2948,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
_parserLock.Wait(canReleaseFromAnyThread: true);
WriteToServerInternalRestAsync(cts, source);
},
connectionToAbort: _connection,
onFailure: static (Exception _, object state) => ((StrongBox<CancellationTokenRegistration>)state).Value.Dispose(),
onCancellation: static (object state) => ((StrongBox<CancellationTokenRegistration>)state).Value.Dispose(),
exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex));
Expand Down Expand Up @@ -2989,7 +3000,8 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio
if (internalResultsTask != null)
{
AsyncHelper.ContinueTaskWithState(internalResultsTask, source, this,
onSuccess: (object state) => ((SqlBulkCopy)state).WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source)
onSuccess: (object state) => ((SqlBulkCopy)state).WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source),
connectionToDoom: _connection.GetOpenTdsConnection()
);
}
else
Expand Down Expand Up @@ -3073,7 +3085,8 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
{
sqlBulkCopy.WriteToServerInternalRestAsync(ctoken, source); // Passing the same completion which will be completed by the Callee.
}
}
},
connectionToDoom: _connection.GetOpenTdsConnection()
);
return resultTask;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,11 +1233,30 @@ public override void ChangeDatabase(string database)
SqlStatistics statistics = null;
RepairInnerConnection();
SqlClientEventSource.Log.TryCorrelationTraceEvent("SqlConnection.ChangeDatabase | API | Correlation | Object Id {0}, Activity Id {1}, Database {2}", ObjectID, ActivityCorrelator.Current, database);
TdsParser bestEffortCleanupTarget = null;

try
{
bestEffortCleanupTarget = SqlInternalConnection.GetBestEffortCleanupTarget(this);
statistics = SqlStatistics.StartTimer(Statistics);
InnerConnection.ChangeDatabase(database);
}
catch (System.OutOfMemoryException e)
{
Abort(e);
throw;
}
catch (System.StackOverflowException e)
{
Abort(e);
throw;
}
catch (System.Threading.ThreadAbortException e)
{
Abort(e);
SqlInternalConnection.BestEffortCleanup(bestEffortCleanupTarget);
throw;
}
finally
{
SqlStatistics.StopTimer(statistics);
Expand Down Expand Up @@ -1301,10 +1320,12 @@ public override void Close()
}

SqlStatistics statistics = null;
TdsParser bestEffortCleanupTarget = null;

Exception e = null;
try
{
bestEffortCleanupTarget = SqlInternalConnection.GetBestEffortCleanupTarget(this);
statistics = SqlStatistics.StartTimer(Statistics);

Task reconnectTask = _currentReconnectionTask;
Expand All @@ -1330,6 +1351,25 @@ public override void Close()
_statistics._closeTimestamp = ADP.TimerCurrent();
}
}
catch (System.OutOfMemoryException ex)
{
e = ex;
Abort(ex);
throw;
}
catch (System.StackOverflowException ex)
{
e = ex;
Abort(ex);
throw;
}
catch (System.Threading.ThreadAbortException ex)
{
e = ex;
Abort(ex);
SqlInternalConnection.BestEffortCleanup(bestEffortCleanupTarget);
throw;
}
catch (Exception ex)
{
e = ex;
Expand Down
Loading

0 comments on commit 178d06f

Please sign in to comment.