Skip to content

Commit

Permalink
perf: rework TdsParserStateObject and SqlDataReader snapshots (#198)
Browse files Browse the repository at this point in the history
* rework TdsParserStateObject snapshots

* cache SqlDataReader snapshots
  • Loading branch information
Wraith2 authored and David-Engel committed Nov 6, 2019
1 parent a606f11 commit 178fe51
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ internal class SharedState

private Task _currentTask;
private Snapshot _snapshot;
private Snapshot _cachedSnapshot;
private CancellationTokenSource _cancelAsyncOnCloseTokenSource;
private CancellationToken _cancelAsyncOnCloseToken;

Expand Down Expand Up @@ -803,7 +804,7 @@ private bool TryCleanPartialRead()
}

#if DEBUG
if (_stateObj._pendingData)
if (_stateObj.HasPendingData)
{
byte token;
if (!_stateObj.TryPeekByte(out token))
Expand Down Expand Up @@ -936,7 +937,7 @@ private bool TryCloseInternal(bool closeReader)

try
{
if ((!_isClosed) && (parser != null) && (stateObj != null) && (stateObj._pendingData))
if ((!_isClosed) && (parser != null) && (stateObj != null) && (stateObj.HasPendingData))
{
// It is possible for this to be called during connection close on a
// broken connection, so check state first.
Expand Down Expand Up @@ -1118,7 +1119,7 @@ private bool TryConsumeMetaData()
{
// warning: Don't check the MetaData property within this function
// warning: as it will be a reentrant call
while (_parser != null && _stateObj != null && _stateObj._pendingData && !_metaDataConsumed)
while (_parser != null && _stateObj != null && _stateObj.HasPendingData && !_metaDataConsumed)
{
if (_parser.State == TdsParserState.Broken || _parser.State == TdsParserState.Closed)
{
Expand Down Expand Up @@ -3053,7 +3054,7 @@ private bool TryHasMoreResults(out bool moreResults)

Debug.Assert(null != _command, "unexpected null command from the data reader!");

while (_stateObj._pendingData)
while (_stateObj.HasPendingData)
{
byte token;
if (!_stateObj.TryPeekByte(out token))
Expand Down Expand Up @@ -3137,7 +3138,7 @@ private bool TryHasMoreRows(out bool moreRows)
moreRows = false;
return true;
}
if (_stateObj._pendingData)
if (_stateObj.HasPendingData)
{
// Consume error's, info's, done's on HasMoreRows, so user obtains error on Read.
byte b;
Expand Down Expand Up @@ -3178,7 +3179,7 @@ private bool TryHasMoreRows(out bool moreRows)
moreRows = false;
return false;
}
if (_stateObj._pendingData)
if (_stateObj.HasPendingData)
{
if (!_stateObj.TryPeekByte(out b))
{
Expand Down Expand Up @@ -3451,7 +3452,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
if (moreRows)
{
// read the row from the backend (unless it's an altrow were the marker is already inside the altrow ...)
while (_stateObj._pendingData)
while (_stateObj.HasPendingData)
{
if (_altRowStatus != ALTROWSTATUS.AltRow)
{
Expand Down Expand Up @@ -3483,7 +3484,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
}
}

if (!_stateObj._pendingData)
if (!_stateObj.HasPendingData)
{
if (!TryCloseInternal(false /*closeReader*/))
{
Expand All @@ -3507,7 +3508,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
{
// if we are in SingleRow mode, and we've read the first row,
// read the rest of the rows, if any
while (_stateObj._pendingData && !_sharedState._dataReady)
while (_stateObj.HasPendingData && !_sharedState._dataReady)
{
if (!_parser.TryRun(RunBehavior.ReturnImmediately, _command, this, null, _stateObj, out _sharedState._dataReady))
{
Expand Down Expand Up @@ -3548,7 +3549,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
more = false;

#if DEBUG
if ((!_sharedState._dataReady) && (_stateObj._pendingData))
if ((!_sharedState._dataReady) && (_stateObj.HasPendingData))
{
byte token;
if (!_stateObj.TryPeekByte(out token))
Expand Down Expand Up @@ -3770,6 +3771,10 @@ private bool TryReadColumnInternal(int i, bool readHeaderOnly = false)
{
// reset snapshot to save memory use. We can safely do that here because all SqlDataReader values are stable.
// The retry logic can use the current values to get back to the right state.
if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
_snapshot = null;
PrepareAsyncInvocation(useSnapshot: true);
}
Expand Down Expand Up @@ -4659,6 +4664,10 @@ public override Task<bool> ReadAsync(CancellationToken cancellationToken)
if (!rowTokenRead)
{
rowTokenRead = true;
if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
_snapshot = null;
PrepareAsyncInvocation(useSnapshot: true);
}
Expand Down Expand Up @@ -5112,28 +5121,27 @@ private void PrepareAsyncInvocation(bool useSnapshot)

if (_snapshot == null)
{
_snapshot = new Snapshot
{
_dataReady = _sharedState._dataReady,
_haltRead = _haltRead,
_metaDataConsumed = _metaDataConsumed,
_browseModeInfoConsumed = _browseModeInfoConsumed,
_hasRows = _hasRows,
_altRowStatus = _altRowStatus,
_nextColumnDataToRead = _sharedState._nextColumnDataToRead,
_nextColumnHeaderToRead = _sharedState._nextColumnHeaderToRead,
_columnDataBytesRead = _columnDataBytesRead,
_columnDataBytesRemaining = _sharedState._columnDataBytesRemaining,

// _metadata and _altaMetaDataSetCollection must be Cloned
// before they are updated
_metadata = _metaData,
_altMetaDataSetCollection = _altMetaDataSetCollection,
_tableNames = _tableNames,

_currentStream = _currentStream,
_currentTextReader = _currentTextReader,
};
_snapshot = Interlocked.Exchange(ref _cachedSnapshot, null) ?? new Snapshot();

_snapshot._dataReady = _sharedState._dataReady;
_snapshot._haltRead = _haltRead;
_snapshot._metaDataConsumed = _metaDataConsumed;
_snapshot._browseModeInfoConsumed = _browseModeInfoConsumed;
_snapshot._hasRows = _hasRows;
_snapshot._altRowStatus = _altRowStatus;
_snapshot._nextColumnDataToRead = _sharedState._nextColumnDataToRead;
_snapshot._nextColumnHeaderToRead = _sharedState._nextColumnHeaderToRead;
_snapshot._columnDataBytesRead = _columnDataBytesRead;
_snapshot._columnDataBytesRemaining = _sharedState._columnDataBytesRemaining;

// _metadata and _altaMetaDataSetCollection must be Cloned
// before they are updated
_snapshot._metadata = _metaData;
_snapshot._altMetaDataSetCollection = _altMetaDataSetCollection;
_snapshot._tableNames = _tableNames;

_snapshot._currentStream = _currentStream;
_snapshot._currentTextReader = _currentTextReader;

_stateObj.SetSnapshot();
}
Expand Down Expand Up @@ -5186,6 +5194,10 @@ private void CleanupAfterAsyncInvocationInternal(TdsParserStateObject stateObj,
stateObj._permitReplayStackTraceToDiffer = false;
#endif

if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
// We are setting this to null inside the if-statement because stateObj==null means that the reader hasn't been initialized or has been closed (either way _snapshot should already be null)
_snapshot = null;
}
Expand Down Expand Up @@ -5224,6 +5236,10 @@ private void SwitchToAsyncWithoutSnapshot()
Debug.Assert(_snapshot != null, "Should currently have a snapshot");
Debug.Assert(_stateObj != null && !_stateObj._asyncReadWithoutSnapshot, "Already in async without snapshot");

if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
_snapshot = null;
_stateObj.ResetSnapshot();
_stateObj._asyncReadWithoutSnapshot = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,11 @@ internal override void ValidateConnectionForExecute(SqlCommand command)
// or if MARS is off, then a datareader exists
throw ADP.OpenReaderExists(parser.MARSOn);
}
else if (!parser.MARSOn && parser._physicalStateObj._pendingData)
else if (!parser.MARSOn && parser._physicalStateObj.HasPendingData)
{
parser.DrainData(parser._physicalStateObj);
}
Debug.Assert(!parser._physicalStateObj._pendingData, "Should not have a busy physicalStateObject at this point!");
Debug.Assert(!parser._physicalStateObj.HasPendingData, "Should not have a busy physicalStateObject at this point!");

parser.RollbackOrphanedAPITransactions();
}
Expand Down Expand Up @@ -840,7 +840,7 @@ private void ResetConnection()
// obtains a clone.

Debug.Assert(!HasLocalTransactionFromAPI, "Upon ResetConnection SqlInternalConnectionTds has a currently ongoing local transaction.");
Debug.Assert(!_parser._physicalStateObj._pendingData, "Upon ResetConnection SqlInternalConnectionTds has pending data.");
Debug.Assert(!_parser._physicalStateObj.HasPendingData, "Upon ResetConnection SqlInternalConnectionTds has pending data.");

if (_fResetConnection)
{
Expand Down
Loading

0 comments on commit 178fe51

Please sign in to comment.