Skip to content

Commit

Permalink
Fix | Fixed SqlBulkCopy.WriteToServer to ensure successful completion…
Browse files Browse the repository at this point in the history
… of consecutive calls to it. (#2375)
  • Loading branch information
arellegue authored Mar 5, 2024
1 parent 2ddf723 commit 5376a04
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1640,14 +1640,12 @@ public void WriteToServer(DbDataReader reader)
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_dbDataReaderRowSource = reader;
_sqlDataReaderRowSource = reader as SqlDataReader;

_dataTableSource = null;
_rowSourceType = ValueSourceType.DbDataReader;

_isAsyncBulkCopy = false;
WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
finally
Expand All @@ -1673,12 +1671,11 @@ public void WriteToServer(IDataReader reader)
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_sqlDataReaderRowSource = _rowSource as SqlDataReader;
_dbDataReaderRowSource = _rowSource as DbDataReader;
_dataTableSource = null;
_rowSourceType = ValueSourceType.IDataReader;
_isAsyncBulkCopy = false;
WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
finally
Expand Down Expand Up @@ -1707,13 +1704,12 @@ public void WriteToServer(DataTable table, DataRowState rowState)
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
_rowSource = table;
_dataTableSource = table;
_sqlDataReaderRowSource = null;
_rowSourceType = ValueSourceType.DataTable;
_rowEnumerator = table.Rows.GetEnumerator();
_isAsyncBulkCopy = false;

WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
Expand Down Expand Up @@ -1746,16 +1742,14 @@ public void WriteToServer(DataRow[] rows)
try
{
statistics = SqlStatistics.StartTimer(Statistics);

ResetWriteToServerGlobalVariables();
DataTable table = rows[0].Table;
Debug.Assert(null != table, "How can we have rows without a table?");
_rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
_rowSource = rows;
_dataTableSource = table;
_sqlDataReaderRowSource = null;
_rowSourceType = ValueSourceType.RowArray;
_rowEnumerator = rows.GetEnumerator();
_isAsyncBulkCopy = false;

WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
Expand Down Expand Up @@ -1787,7 +1781,7 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok
try
{
statistics = SqlStatistics.StartTimer(Statistics);

ResetWriteToServerGlobalVariables();
if (rows.Length == 0)
{
return cancellationToken.IsCancellationRequested ?
Expand All @@ -1800,7 +1794,6 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok
_rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
_rowSource = rows;
_dataTableSource = table;
_sqlDataReaderRowSource = null;
_rowSourceType = ValueSourceType.RowArray;
_rowEnumerator = rows.GetEnumerator();
_isAsyncBulkCopy = true;
Expand Down Expand Up @@ -1834,10 +1827,10 @@ public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellati
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_sqlDataReaderRowSource = reader as SqlDataReader;
_dbDataReaderRowSource = reader;
_dataTableSource = null;
_rowSourceType = ValueSourceType.DbDataReader;
_isAsyncBulkCopy = true;
resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); // It returns Task since _isAsyncBulkCopy = true;
Expand Down Expand Up @@ -1871,10 +1864,10 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellatio
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_sqlDataReaderRowSource = _rowSource as SqlDataReader;
_dbDataReaderRowSource = _rowSource as DbDataReader;
_dataTableSource = null;
_rowSourceType = ValueSourceType.IDataReader;
_isAsyncBulkCopy = true;
resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); // It returns Task since _isAsyncBulkCopy = true;
Expand Down Expand Up @@ -1914,9 +1907,9 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
_rowSource = table;
_sqlDataReaderRowSource = null;
_dataTableSource = table;
_rowSourceType = ValueSourceType.DataTable;
_rowEnumerator = table.Rows.GetEnumerator();
Expand Down Expand Up @@ -3093,5 +3086,17 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
}
return resultTask;
}

private void ResetWriteToServerGlobalVariables()
{
_dataTableSource = null;
_dbDataReaderRowSource = null;
_isAsyncBulkCopy = false;
_rowEnumerator = null;
_rowSource = null;
_rowSourceType = ValueSourceType.Unspecified;
_sqlDataReaderRowSource = null;
_sqlDataReaderRowSource = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,7 @@ public void WriteToServer(DbDataReader reader)
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_dbDataReaderRowSource = reader;
_sqlDataReaderRowSource = reader as SqlDataReader;
Expand All @@ -1697,10 +1698,8 @@ public void WriteToServer(DbDataReader reader)
{
_rowSourceIsSqlDataReaderSmi = _sqlDataReaderRowSource is SqlDataReaderSmi;
}
_dataTableSource = null;
_rowSourceType = ValueSourceType.DbDataReader;

_isAsyncBulkCopy = false;
WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
finally
Expand Down Expand Up @@ -1728,16 +1727,15 @@ public void WriteToServer(IDataReader reader)
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_sqlDataReaderRowSource = _rowSource as SqlDataReader;
if (_sqlDataReaderRowSource != null)
{
_rowSourceIsSqlDataReaderSmi = _sqlDataReaderRowSource is SqlDataReaderSmi;
}
_dbDataReaderRowSource = _rowSource as DbDataReader;
_dataTableSource = null;
_rowSourceType = ValueSourceType.IDataReader;
_isAsyncBulkCopy = false;
WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
finally
Expand Down Expand Up @@ -1768,13 +1766,12 @@ public void WriteToServer(DataTable table, DataRowState rowState)
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
_rowSource = table;
_dataTableSource = table;
_sqlDataReaderRowSource = null;
_rowSourceType = ValueSourceType.DataTable;
_rowEnumerator = table.Rows.GetEnumerator();
_isAsyncBulkCopy = false;

WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
Expand Down Expand Up @@ -1809,16 +1806,14 @@ public void WriteToServer(DataRow[] rows)
try
{
statistics = SqlStatistics.StartTimer(Statistics);

ResetWriteToServerGlobalVariables();
DataTable table = rows[0].Table;
Debug.Assert(null != table, "How can we have rows without a table?");
_rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
_rowSource = rows;
_dataTableSource = table;
_sqlDataReaderRowSource = null;
_rowSourceType = ValueSourceType.RowArray;
_rowEnumerator = rows.GetEnumerator();
_isAsyncBulkCopy = false;

WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
}
Expand Down Expand Up @@ -1851,7 +1846,7 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok
try
{
statistics = SqlStatistics.StartTimer(Statistics);

ResetWriteToServerGlobalVariables();
if (rows.Length == 0)
{
TaskCompletionSource<object> source = new TaskCompletionSource<object>();
Expand All @@ -1872,7 +1867,6 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok
_rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
_rowSource = rows;
_dataTableSource = table;
_sqlDataReaderRowSource = null;
_rowSourceType = ValueSourceType.RowArray;
_rowEnumerator = rows.GetEnumerator();
_isAsyncBulkCopy = true;
Expand Down Expand Up @@ -1908,10 +1902,10 @@ public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellati
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_sqlDataReaderRowSource = reader as SqlDataReader;
_dbDataReaderRowSource = reader;
_dataTableSource = null;
_rowSourceType = ValueSourceType.DbDataReader;
_isAsyncBulkCopy = true;
resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); // It returns Task since _isAsyncBulkCopy = true;
Expand Down Expand Up @@ -1946,10 +1940,10 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellatio
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowSource = reader;
_sqlDataReaderRowSource = _rowSource as SqlDataReader;
_dbDataReaderRowSource = _rowSource as DbDataReader;
_dataTableSource = null;
_rowSourceType = ValueSourceType.IDataReader;
_isAsyncBulkCopy = true;
resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); // It returns Task since _isAsyncBulkCopy = true;
Expand Down Expand Up @@ -1990,9 +1984,9 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat
try
{
statistics = SqlStatistics.StartTimer(Statistics);
ResetWriteToServerGlobalVariables();
_rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
_rowSource = table;
_sqlDataReaderRowSource = null;
_dataTableSource = table;
_rowSourceType = ValueSourceType.DataTable;
_rowEnumerator = table.Rows.GetEnumerator();
Expand Down Expand Up @@ -3212,5 +3206,17 @@ private Task WriteToServerInternalAsync(CancellationToken ctoken)
}
return resultTask;
}

private void ResetWriteToServerGlobalVariables()
{
_dataTableSource = null;
_dbDataReaderRowSource = null;
_isAsyncBulkCopy = false;
_rowEnumerator = null;
_rowSource = null;
_rowSourceType = ValueSourceType.Unspecified;
_sqlDataReaderRowSource = null;
_sqlDataReaderRowSource = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
<Compile Include="SQL\SqlBulkCopyTest\TableLock.cs" />
<Compile Include="SQL\SqlBulkCopyTest\DataConversionErrorMessageTest.cs" />
<Compile Include="SQL\SqlBulkCopyTest\CopyWidenNullInexactNumerics.cs" />
<Compile Include="SQL\SqlBulkCopyTest\WriteToServerTest.cs" />
</ItemGroup>
<ItemGroup Condition="'$(TargetsWindows)' == 'true' AND ('$(TestSet)' == '' OR '$(TestSet)' == '2')">
<Compile Include="SQL\SqlDSEnumeratorTest\SqlDataSourceEnumeratorTest.cs" />
Expand Down
Loading

0 comments on commit 5376a04

Please sign in to comment.