Skip to content

Commit

Permalink
coordinated request observables when aborted could bubble out an Unex…
Browse files Browse the repository at this point in the history
…pectedElasticsearchClientException or a OperationCancelledException, this normalizes what gets passed to OnError (OperationCancelledException) (#4027)

(cherry picked from commit 6b8c63f)
  • Loading branch information
Mpdreamz committed Aug 15, 2019
1 parent 44f4728 commit 457c084
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 5 deletions.
2 changes: 2 additions & 0 deletions src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
{
if (!FirstPoolUsageNeedsSniffing) return;

// TODO cancellationToken could throw here and will bubble out as OperationCancelledException
// everywhere else it would bubble out wrapped in a `UnexpectedElasticsearchClientException`
var success = await semaphore.WaitAsync(_settings.RequestTimeout, cancellationToken).ConfigureAwait(false);
if (!success)
{
Expand Down
3 changes: 3 additions & 0 deletions src/Elasticsearch.Net/Transport/Transport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public async Task<TResponse> RequestAsync<TResponse>(HttpMethod method, string p
}
catch (Exception killerException)
{
if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested)
pipeline.AuditCancellationRequested();

throw new UnexpectedElasticsearchClientException(killerException, seenExceptions)
{
Request = requestData,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Elasticsearch.Net;

namespace Nest
{
Expand Down Expand Up @@ -27,7 +28,15 @@ protected CoordinatedRequestObserverBase(Action<T> onNext = null, Action<Excepti

public void OnCompleted() => _completed?.Invoke();

public void OnError(Exception error) => _onError?.Invoke(error);
public void OnError(Exception error)
{
// This normalizes task cancellation exceptions for observables
// If a task cancellation happens in the client it bubbles out as a UnexpectedElasticsearchClientException
// where as inside our IObservable implementation we .ThrowIfCancellationRequested() directly.
if (error is UnexpectedElasticsearchClientException es && es.InnerException != null && es.InnerException is OperationCanceledException c)
_onError?.Invoke(c);
else _onError?.Invoke(error);
}

public void OnNext(T value) => _onNext?.Invoke(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ public void CancelBulkAll()
//when we subscribe the observable becomes hot
observableBulk.Subscribe(bulkObserver);

//we wait Nseconds to see some bulks
//we wait N seconds to see some bulks
handle.WaitOne(TimeSpan.FromSeconds(3));
tokenSource.Cancel();
//we wait Nseconds to give in flight request a chance to cancel
//we wait N seconds to give in flight request a chance to cancel
handle.WaitOne(TimeSpan.FromSeconds(3));
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;

if (ex != null && !(ex is OperationCanceledException)) throw ex;

seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
var count = Client.Count<SmallObject>(f => f.Index(index));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Threading;
using System.Threading.Tasks;
using Elastic.Xunit.XunitPlumbing;
using Elasticsearch.Net;
using FluentAssertions;
using Nest;
using Tests.Core.ManagedElasticsearch.Clusters;
Expand Down Expand Up @@ -50,7 +51,8 @@ public void DisposingObservableCancelsBulkAll()
observableBulk.Dispose();
//we wait N seconds to give in flight request a chance to cancel
handle.WaitOne(TimeSpan.FromSeconds(3));
if (ex != null && !(ex is TaskCanceledException) && !(ex is OperationCanceledException)) throw ex;

if (ex != null && !(ex is OperationCanceledException)) throw ex;

seenPages.Should().BeLessThan(pages).And.BeGreaterThan(0);
var count = Client.Count<SmallObject>(f => f.Index(index));
Expand Down

0 comments on commit 457c084

Please sign in to comment.