Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Fix Async Cancel (#956)" #1352

Merged
merged 1 commit into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1450,13 +1450,19 @@ public int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
if (!_internalEndExecuteInitiated && _stateObj != null)
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -1865,15 +1871,19 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();

if (!_internalEndExecuteInitiated && _stateObj != null)
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else
{
return EndExecuteXmlReaderInternal(asyncResult);
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2059,15 +2069,18 @@ internal SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();

if (!_internalEndExecuteInitiated && _stateObj != null)
// lock on _stateObj prevents races with close/cancel.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else
{
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
return EndExecuteReaderInternal(asyncResult);
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,10 @@ public TimeoutState(int value)
// 2) post first packet write, but before session return - a call to cancel will send an
// attention to the server
// 3) post session close - no attention is allowed
private bool _cancelled;
private const int _waitForCancellationLockPollTimeout = 100;
private WeakReference _cancellationOwner = new WeakReference(null);

private static class CancelState
{
public const int Unset = 0;
public const int Closed = 1;
public const int Cancelled = 2;
}

private int _cancelState;

// Cache the transaction for which this command was executed so upon completion we can
// decrement the appropriate result count.
internal SqlInternalTransaction _executedUnderTransaction;
Expand Down Expand Up @@ -631,50 +623,68 @@ internal void Activate(object owner)
Debug.Assert(result == 1, "invalid deactivate count");
}

internal bool SetCancelStateClosed()
{
return Interlocked.CompareExchange(ref _cancelState, CancelState.Closed, CancelState.Unset) == CancelState.Unset && _cancelState == CancelState.Closed;
}

// This method is only called by the command or datareader as a result of a user initiated
// cancel request.
internal void Cancel(object caller)
{
Debug.Assert(caller != null, "Null caller for Cancel!");
Debug.Assert(caller is SqlCommand || caller is SqlDataReader, "Calling API with invalid caller type: " + caller.GetType());

// only change state if it is Unset, so don't check the return value
Interlocked.CompareExchange(ref _cancelState, CancelState.Cancelled, CancelState.Unset);

if ((_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken)
&& (_cancellationOwner.Target == caller) && HasPendingData && !_attentionSent)
bool hasLock = false;
try
{
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
// Keep looping until we either grabbed the lock (and therefore sent attention) or the connection closes\breaks
while ((!hasLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
Monitor.TryEnter(this, _waitForCancellationLockPollTimeout, ref hasLock);
if (hasLock)
{ // Lock for the time being - since we need to synchronize the attention send.
// This lock is also protecting against concurrent close and async continuations

// Ensure that, once we have the lock, that we are still the owner
if ((!_cancelled) && (_cancellationOwner.Target == caller))
{
if (_parser.Connection.ThreadHasParserLockForClose)
_cancelled = true;

if (HasPendingData && !_attentionSent)
{
_parser.Connection.ThreadHasParserLockForClose = false;
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: _waitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
}
_parser.Connection._parserLock.Release();
}
}
}
}
finally
{
if (hasLock)
{
Monitor.Exit(this);
}
}
}

// CancelRequest - use to cancel while writing a request to the server
Expand Down Expand Up @@ -761,7 +771,7 @@ private void ResetCancelAndProcessAttention()
lock (this)
{
// Reset cancel state.
_cancelState = CancelState.Unset;
_cancelled = false;
_cancellationOwner.Target = null;

if (_attentionSent)
Expand Down Expand Up @@ -983,10 +993,10 @@ internal Task ExecuteFlush()
{
lock (this)
{
if (_cancelState != CancelState.Unset && 1 == _outputPacketNumber)
if (_cancelled && 1 == _outputPacketNumber)
{
ResetBuffer();
_cancelState = CancelState.Unset;
_cancelled = false;
throw SQL.OperationCancelled();
}
else
Expand Down Expand Up @@ -3344,7 +3354,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)
byte packetNumber = _outputPacketNumber;

// Set Status byte based whether this is end of message or not
bool willCancel = (_cancelState != CancelState.Unset) && (_parser._asyncWrite);
bool willCancel = (_cancelled) && (_parser._asyncWrite);
if (willCancel)
{
status = TdsEnums.ST_EOM | TdsEnums.ST_IGNORE;
Expand Down Expand Up @@ -3392,7 +3402,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)

private void CancelWritePacket()
{
Debug.Assert(_cancelState != CancelState.Unset, "Should not call CancelWritePacket if _cancelled is not set");
Debug.Assert(_cancelled, "Should not call CancelWritePacket if _cancelled is not set");

_parser.Connection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we are holding the lock
try
Expand Down Expand Up @@ -3978,7 +3988,7 @@ internal void AssertStateIsClean()
Debug.Assert(_delayedWriteAsyncCallbackException == null, "StateObj has an unobserved exceptions from an async write");
// Attention\Cancellation\Timeouts
Debug.Assert(!HasReceivedAttention && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {HasReceivedAttention}, Sending: {_attentionSending}");
Debug.Assert(_cancelState == CancelState.Unset, "StateObj still has cancellation set");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(_timeoutState == TimeoutState.Stopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1779,13 +1779,19 @@ private int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
if (!_internalEndExecuteInitiated && _stateObj != null)
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2293,14 +2299,19 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
if (!_internalEndExecuteInitiated && _stateObj != null)
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else
{
return EndExecuteXmlReaderInternal(asyncResult);
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2547,15 +2558,19 @@ private SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();

if (!_internalEndExecuteInitiated && _stateObj != null)
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else
{
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
return EndExecuteReaderInternal(asyncResult);
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Loading