Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Async Cancel #956

Merged
merged 10 commits into from
Sep 17, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -1404,19 +1404,13 @@ public int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
if (!_internalEndExecuteInitiated && _stateObj != null)
{
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -1820,19 +1814,15 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else

if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteXmlReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2017,18 +2007,15 @@ internal SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else

if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,18 @@ public TimeoutState(int value)
// 2) post first packet write, but before session return - a call to cancel will send an
// attention to the server
// 3) post session close - no attention is allowed
private bool _cancelled;
private const int _waitForCancellationLockPollTimeout = 100;
private WeakReference _cancellationOwner = new WeakReference(null);

private static class CancelState
{
public const int Unset = 0;
public const int Closed = 1;
public const int Cancelled = 2;
}

private int _cancelState;

// Cache the transaction for which this command was executed so upon completion we can
// decrement the appropriate result count.
internal SqlInternalTransaction _executedUnderTransaction;
Expand Down Expand Up @@ -629,68 +637,54 @@ internal void Activate(object owner)
Debug.Assert(result == 1, "invalid deactivate count");
}

internal bool SetCancelStateClosed()
{
return Interlocked.CompareExchange(ref _cancelState, CancelState.Closed, CancelState.Unset) == CancelState.Unset && _cancelState == CancelState.Closed;
}

// This method is only called by the command or datareader as a result of a user initiated
// cancel request.
internal void Cancel(object caller)
{
Debug.Assert(caller != null, "Null caller for Cancel!");
Debug.Assert(caller is SqlCommand || caller is SqlDataReader, "Calling API with invalid caller type: " + caller.GetType());

bool hasLock = false;
try
Interlocked.CompareExchange(ref _cancelState, CancelState.Cancelled, CancelState.Unset); // only change state if it is Unset, so don't check the return value

if (
(_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken) &&
(_cancellationOwner.Target == caller)
)
{
// Keep looping until we either grabbed the lock (and therefore sent attention) or the connection closes\breaks
while ((!hasLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
if (HasPendingData && !_attentionSent)
{
Monitor.TryEnter(this, _waitForCancellationLockPollTimeout, ref hasLock);
if (hasLock)
{ // Lock for the time being - since we need to synchronize the attention send.
// This lock is also protecting against concurrent close and async continuations

// Ensure that, once we have the lock, that we are still the owner
if ((!_cancelled) && (_cancellationOwner.Target == caller))
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
try
{
_cancelled = true;

if (HasPendingData && !_attentionSent)
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
if (_parser.Connection.ThreadHasParserLockForClose)
{
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
}
}
finally
{
if (hasLock)
{
Monitor.Exit(this);
}
}
}

// CancelRequest - use to cancel while writing a request to the server
Expand Down Expand Up @@ -777,7 +771,7 @@ private void ResetCancelAndProcessAttention()
lock (this)
{
// Reset cancel state.
_cancelled = false;
_cancelState = CancelState.Unset;
_cancellationOwner.Target = null;

if (_attentionSent)
Expand Down Expand Up @@ -999,10 +993,10 @@ internal Task ExecuteFlush()
{
lock (this)
{
if (_cancelled && 1 == _outputPacketNumber)
if (_cancelState != CancelState.Unset && 1 == _outputPacketNumber)
{
ResetBuffer();
_cancelled = false;
_cancelState = CancelState.Unset;
throw SQL.OperationCancelled();
}
else
Expand Down Expand Up @@ -3353,7 +3347,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)
byte packetNumber = _outputPacketNumber;

// Set Status byte based whether this is end of message or not
bool willCancel = (_cancelled) && (_parser._asyncWrite);
bool willCancel = (_cancelState != CancelState.Unset) && (_parser._asyncWrite);
if (willCancel)
{
status = TdsEnums.ST_EOM | TdsEnums.ST_IGNORE;
Expand Down Expand Up @@ -3401,7 +3395,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)

private void CancelWritePacket()
{
Debug.Assert(_cancelled, "Should not call CancelWritePacket if _cancelled is not set");
Debug.Assert(_cancelState != CancelState.Unset, "Should not call CancelWritePacket if _cancelled is not set");

_parser.Connection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we are holding the lock
try
Expand Down Expand Up @@ -3987,7 +3981,7 @@ internal void AssertStateIsClean()
Debug.Assert(_delayedWriteAsyncCallbackException == null, "StateObj has an unobserved exceptions from an async write");
// Attention\Cancellation\Timeouts
Debug.Assert(!HasReceivedAttention && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {HasReceivedAttention}, Sending: {_attentionSending}");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(_cancelState == CancelState.Unset, "StateObj still has cancellation set");
Debug.Assert(_timeoutState == TimeoutState.Stopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1757,19 +1757,13 @@ private int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
if (!_internalEndExecuteInitiated && _stateObj != null)
{
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2270,19 +2264,14 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else
if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteXmlReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2532,19 +2521,15 @@ private SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else

if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Loading