Skip to content

Commit

Permalink
#343 #601 - Fetch Operations Async Calls Fixes | Updates to ExecuteQu…
Browse files Browse the repository at this point in the history
…ery Cancellation Tokens.
  • Loading branch information
mikependon committed Sep 28, 2020
1 parent 99c546f commit 323f77d
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 41 deletions.
67 changes: 31 additions & 36 deletions RepoDb.Core/RepoDb/Extensions/DbConnectionExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public static Task<IEnumerable<dynamic>> ExecuteQueryAsync(this IDbConnection co
int? commandTimeout = null,
IDbTransaction transaction = null,
ICache cache = null,
CancellationToken? cancellationToken = null)
CancellationToken cancellationToken = default)
{
return ExecuteQueryAsyncInternal(connection: connection,
commandText: commandText,
Expand Down Expand Up @@ -290,7 +290,7 @@ internal static async Task<IEnumerable<dynamic>> ExecuteQueryAsyncInternal(this
int? commandTimeout = null,
IDbTransaction transaction = null,
ICache cache = null,
CancellationToken? cancellationToken = null,
CancellationToken cancellationToken = default,
string tableName = null,
bool skipCommandArrayParametersCheck = true)
{
Expand Down Expand Up @@ -319,14 +319,14 @@ internal static async Task<IEnumerable<dynamic>> ExecuteQueryAsyncInternal(this
dbFields: dbFields,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck))
{
using (var reader = cancellationToken.HasValue ? await command.ExecuteReaderAsync(cancellationToken.Value) : await command.ExecuteReaderAsync())
using (var reader = await command.ExecuteReaderAsync(cancellationToken))
{
var result = (IEnumerable<dynamic>)DataReader.ToEnumerable(reader, dbFields, connection.GetDbSetting()).AsList();
var result = (await DataReader.ToEnumerableAsync(reader, dbFields, connection.GetDbSetting(), cancellationToken)).AsList();

// Set Cache
if (cacheKey != null)
{
cache?.Add(cacheKey, result, cacheItemExpiration.GetValueOrDefault(), false);
cache?.Add(cacheKey, (IEnumerable<dynamic>)result, cacheItemExpiration.GetValueOrDefault(), false);
}

// Return
Expand Down Expand Up @@ -624,8 +624,8 @@ public static Task<IEnumerable<TResult>> ExecuteQueryAsync<TResult>(this IDbConn
int? cacheItemExpiration = Constant.DefaultCacheItemExpirationInMinutes,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null,
ICache cache = null)
ICache cache = null,
CancellationToken cancellationToken = default)
{
return ExecuteQueryAsyncInternal<TResult>(connection: connection,
commandText: commandText,
Expand Down Expand Up @@ -666,8 +666,8 @@ internal static Task<IEnumerable<TResult>> ExecuteQueryAsyncInternal<TResult>(th
int? cacheItemExpiration = Constant.DefaultCacheItemExpirationInMinutes,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null,
ICache cache = null,
CancellationToken cancellationToken = default,
string tableName = null,
bool skipCommandArrayParametersCheck = true)
{
Expand Down Expand Up @@ -695,8 +695,8 @@ internal static Task<IEnumerable<TResult>> ExecuteQueryAsyncInternal<TResult>(th
cacheItemExpiration: cacheItemExpiration,
commandTimeout: commandTimeout,
transaction: transaction,
cancellationToken: cancellationToken,
cache: cache,
cancellationToken: cancellationToken,
tableName: tableName,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck);
}
Expand All @@ -710,8 +710,8 @@ internal static Task<IEnumerable<TResult>> ExecuteQueryAsyncInternal<TResult>(th
cacheItemExpiration: cacheItemExpiration,
commandTimeout: commandTimeout,
transaction: transaction,
cancellationToken: cancellationToken,
cache: cache,
cancellationToken: cancellationToken,
tableName: tableName,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck);
}
Expand Down Expand Up @@ -742,8 +742,8 @@ private static async Task<IEnumerable<TResult>> ExecuteQueryAsyncInternalForDict
int? cacheItemExpiration = Constant.DefaultCacheItemExpirationInMinutes,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null,
ICache cache = null,
CancellationToken cancellationToken = default,
string tableName = null,
bool skipCommandArrayParametersCheck = true)
{
Expand All @@ -766,8 +766,8 @@ private static async Task<IEnumerable<TResult>> ExecuteQueryAsyncInternalForDict
cacheItemExpiration: null,
commandTimeout: commandTimeout,
transaction: transaction,
cancellationToken: cancellationToken,
cache: null,
cancellationToken: cancellationToken,
tableName: tableName,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck)).OfTargetType<dynamic, TResult>();

Expand All @@ -793,8 +793,8 @@ private static async Task<IEnumerable<TResult>> ExecuteQueryAsyncInternalForDict
/// <param name="cacheItemExpiration"></param>
/// <param name="commandTimeout"></param>
/// <param name="transaction"></param>
/// <param name="cancellationToken"></param>
/// <param name="cache"></param>
/// <param name="cancellationToken"></param>
/// <param name="tableName"></param>
/// <param name="skipCommandArrayParametersCheck"></param>
/// <returns></returns>
Expand All @@ -806,8 +806,8 @@ private static async Task<IEnumerable<TResult>> ExecuteQueryAsyncInternalForType
int? cacheItemExpiration = Constant.DefaultCacheItemExpirationInMinutes,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null,
ICache cache = null,
CancellationToken cancellationToken = default,
string tableName = null,
bool skipCommandArrayParametersCheck = true)
{
Expand Down Expand Up @@ -836,15 +836,14 @@ private static async Task<IEnumerable<TResult>> ExecuteQueryAsyncInternalForType
dbFields: dbFields,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck))
{
using (var reader = cancellationToken.HasValue ? await command.ExecuteReaderAsync(cancellationToken.Value) : await command.ExecuteReaderAsync())
using (var reader = await command.ExecuteReaderAsync(cancellationToken))
{
var result = (IEnumerable<TResult>)DataReader.ToEnumerable<TResult>(reader, dbFields,
connection.GetDbSetting()).AsList();
var result = (await DataReader.ToEnumerableAsync<TResult>(reader, dbFields, connection.GetDbSetting(), cancellationToken)).AsList();

// Set Cache
if (cacheKey != null)
{
cache?.Add(cacheKey, result, cacheItemExpiration.GetValueOrDefault(), false);
cache?.Add(cacheKey, (IEnumerable<TResult>)result, cacheItemExpiration.GetValueOrDefault(), false);
}

// Return
Expand Down Expand Up @@ -912,7 +911,7 @@ public static async Task<QueryMultipleExtractor> ExecuteQueryMultipleAsync(this
CommandType? commandType = null,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null)
CancellationToken cancellationToken = default)
{
// Call
var reader = await ExecuteReaderAsyncInternal(connection: connection,
Expand All @@ -927,7 +926,7 @@ public static async Task<QueryMultipleExtractor> ExecuteQueryMultipleAsync(this
skipCommandArrayParametersCheck: false);

// Return
return new QueryMultipleExtractor((DbDataReader)reader);
return new QueryMultipleExtractor((DbDataReader)reader, cancellationToken);
}

#endregion
Expand Down Expand Up @@ -1046,7 +1045,7 @@ public static Task<IDataReader> ExecuteReaderAsync(this IDbConnection connection
CommandType? commandType = null,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null)
CancellationToken cancellationToken = default)
{
return ExecuteReaderAsyncInternal(connection: connection,
commandText: commandText,
Expand Down Expand Up @@ -1080,7 +1079,7 @@ internal static async Task<IDataReader> ExecuteReaderAsyncInternal(this IDbConne
CommandType? commandType,
int? commandTimeout,
IDbTransaction transaction,
CancellationToken? cancellationToken,
CancellationToken cancellationToken,
Type entityType,
IEnumerable<DbField> dbFields,
bool skipCommandArrayParametersCheck)
Expand All @@ -1101,7 +1100,7 @@ internal static async Task<IDataReader> ExecuteReaderAsyncInternal(this IDbConne
// Ensure the DbCommand disposal
try
{
return cancellationToken.HasValue ? await command.ExecuteReaderAsync(cancellationToken.Value) : await command.ExecuteReaderAsync();
return await command.ExecuteReaderAsync(cancellationToken);
}
catch
{
Expand Down Expand Up @@ -1215,7 +1214,7 @@ public static Task<int> ExecuteNonQueryAsync(this IDbConnection connection,
CommandType? commandType = null,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null)
CancellationToken cancellationToken = default)
{
return ExecuteNonQueryAsyncInternal(connection: connection,
commandText: commandText,
Expand Down Expand Up @@ -1249,7 +1248,7 @@ internal static async Task<int> ExecuteNonQueryAsyncInternal(this IDbConnection
CommandType? commandType,
int? commandTimeout,
IDbTransaction transaction,
CancellationToken? cancellationToken,
CancellationToken cancellationToken,
Type entityType,
IEnumerable<DbField> dbFields,
bool skipCommandArrayParametersCheck)
Expand All @@ -1264,7 +1263,7 @@ internal static async Task<int> ExecuteNonQueryAsyncInternal(this IDbConnection
dbFields: dbFields,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck))
{
return cancellationToken.HasValue ? await command.ExecuteNonQueryAsync(cancellationToken.Value) : await command.ExecuteNonQueryAsync();
return await command.ExecuteNonQueryAsync(cancellationToken);
}
}

Expand Down Expand Up @@ -1366,7 +1365,7 @@ public static Task<object> ExecuteScalarAsync(this IDbConnection connection,
CommandType? commandType = null,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null)
CancellationToken cancellationToken = default)
{
return ExecuteScalarAsyncInternal(connection: connection,
commandText: commandText,
Expand Down Expand Up @@ -1400,7 +1399,7 @@ internal static async Task<object> ExecuteScalarAsyncInternal(this IDbConnection
CommandType? commandType,
int? commandTimeout,
IDbTransaction transaction,
CancellationToken? cancellationToken,
CancellationToken cancellationToken,
Type entityType,
IEnumerable<DbField> dbFields,
bool skipCommandArrayParametersCheck)
Expand All @@ -1415,9 +1414,7 @@ internal static async Task<object> ExecuteScalarAsyncInternal(this IDbConnection
dbFields: dbFields,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck))
{
var result = cancellationToken.HasValue ? await command.ExecuteScalarAsync(cancellationToken.Value) :
await command.ExecuteScalarAsync();
return Converter.DbNullToNull(result);
return Converter.DbNullToNull(await command.ExecuteScalarAsync(cancellationToken));
}
}

Expand Down Expand Up @@ -1522,7 +1519,7 @@ public static Task<TResult> ExecuteScalarAsync<TResult>(this IDbConnection conne
CommandType? commandType = null,
int? commandTimeout = null,
IDbTransaction transaction = null,
CancellationToken? cancellationToken = null)
CancellationToken cancellationToken = default)
{
return ExecuteScalarAsyncInternal<TResult>(connection: connection,
commandText: commandText,
Expand Down Expand Up @@ -1557,7 +1554,7 @@ internal static async Task<TResult> ExecuteScalarAsyncInternal<TResult>(this IDb
CommandType? commandType,
int? commandTimeout,
IDbTransaction transaction,
CancellationToken? cancellationToken,
CancellationToken cancellationToken,
Type entityType,
IEnumerable<DbField> dbFields,
bool skipCommandArrayParametersCheck)
Expand All @@ -1572,9 +1569,7 @@ internal static async Task<TResult> ExecuteScalarAsyncInternal<TResult>(this IDb
dbFields: dbFields,
skipCommandArrayParametersCheck: skipCommandArrayParametersCheck))
{
var result = cancellationToken.HasValue ? await command.ExecuteScalarAsync(cancellationToken.Value) :
await command.ExecuteScalarAsync();
return Converter.ToType<TResult>(result);
return Converter.ToType<TResult>(command.ExecuteScalarAsync(cancellationToken));
}
}

Expand Down
19 changes: 14 additions & 5 deletions RepoDb.Core/RepoDb/QueryMultipleExtractor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;

namespace RepoDb
Expand All @@ -23,10 +24,13 @@ public sealed class QueryMultipleExtractor : IDisposable
/// Creates a new instance of <see cref="QueryMultipleExtractor"/> class.
/// </summary>
/// <param name="reader">The <see cref="DbDataReader"/> to be extracted.</param>
internal QueryMultipleExtractor(DbDataReader reader)
/// <param name="cancellationToken">The <see cref="CancellationToken"/> object to be used during the asynchronous operation.</param>
internal QueryMultipleExtractor(DbDataReader reader,
CancellationToken cancellationToken = default)
{
this.reader = reader;
Position = 0;
CancellationToken = cancellationToken;
}

/// <summary>
Expand All @@ -42,6 +46,11 @@ public void Dispose() =>
/// </summary>
public int Position { get; private set; }

/// <summary>
/// Gets the instance of the <see cref="CancellationToken"/> currently in used.
/// </summary>
public CancellationToken CancellationToken { get; private set; }

#endregion

#region Extract
Expand Down Expand Up @@ -74,7 +83,7 @@ public IEnumerable<TEntity> Extract<TEntity>(bool isMoveToNextResult = true)
public async Task<IEnumerable<TEntity>> ExtractAsync<TEntity>(bool isMoveToNextResult = true)
where TEntity : class
{
var result = DataReader.ToEnumerable<TEntity>(reader).AsList();
var result = (await DataReader.ToEnumerableAsync<TEntity>(reader, cancellationToken: CancellationToken)).AsList();
if (isMoveToNextResult)
{
await NextResultAsync();
Expand Down Expand Up @@ -108,7 +117,7 @@ public IEnumerable<dynamic> Extract(bool isMoveToNextResult = true)
/// <returns>An enumerable of extracted data entity.</returns>
public async Task<IEnumerable<dynamic>> ExtractAsync(bool isMoveToNextResult = true)
{
var result = DataReader.ToEnumerable(reader).AsList();
var result = (await DataReader.ToEnumerableAsync(reader, cancellationToken: CancellationToken)).AsList();
if (isMoveToNextResult)
{
await NextResultAsync();
Expand Down Expand Up @@ -153,7 +162,7 @@ public TResult Scalar<TResult>(bool isMoveToNextResult = true)
public async Task<TResult> ScalarAsync<TResult>(bool isMoveToNextResult = true)
{
var value = default(TResult);
if (await reader.ReadAsync())
if (await reader.ReadAsync(CancellationToken))
{
value = Converter.ToType<TResult>(reader[0]);
}
Expand Down Expand Up @@ -202,7 +211,7 @@ public bool NextResult() =>
/// <returns>True if there are more result sets; otherwise false.</returns>
/// </summary>
public async Task<bool> NextResultAsync() =>
(Position = await reader.NextResultAsync() ? Position + 1 : -1) >= 0;
(Position = await reader.NextResultAsync(CancellationToken) ? Position + 1 : -1) >= 0;

#endregion
}
Expand Down
Loading

0 comments on commit 323f77d

Please sign in to comment.