Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add discard as an option for custom recoverability policies #5327

Merged
merged 6 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ namespace NServiceBus
public static void CustomDiagnosticsWriter(this NServiceBus.EndpointConfiguration config, System.Func<string, System.Threading.Tasks.Task> customDiagnosticsWriter) { }
public static void SetDiagnosticsPath(this NServiceBus.EndpointConfiguration config, string path) { }
}
public sealed class Discard : NServiceBus.RecoverabilityAction
{
public string Reason { get; }
}
public class DistributionPolicy : NServiceBus.IDistributionPolicy
{
public DistributionPolicy() { }
Expand Down Expand Up @@ -750,6 +754,8 @@ namespace NServiceBus
{
protected internal RecoverabilityAction() { }
public static NServiceBus.DelayedRetry DelayedRetry(System.TimeSpan timeSpan) { }
public static NServiceBus.Discard Discard() { }
public static NServiceBus.Discard Discard(string reason) { }
public static NServiceBus.ImmediateRetry ImmediateRetry() { }
public static NServiceBus.MoveToError MoveToError(string errorQueue) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ namespace NServiceBus
public static void CustomDiagnosticsWriter(this NServiceBus.EndpointConfiguration config, System.Func<string, System.Threading.Tasks.Task> customDiagnosticsWriter) { }
public static void SetDiagnosticsPath(this NServiceBus.EndpointConfiguration config, string path) { }
}
public sealed class Discard : NServiceBus.RecoverabilityAction
{
public string Reason { get; }
}
public class DistributionPolicy : NServiceBus.IDistributionPolicy
{
public DistributionPolicy() { }
Expand Down Expand Up @@ -750,6 +754,8 @@ namespace NServiceBus
{
protected internal RecoverabilityAction() { }
public static NServiceBus.DelayedRetry DelayedRetry(System.TimeSpan timeSpan) { }
public static NServiceBus.Discard Discard() { }
public static NServiceBus.Discard Discard(string reason) { }
public static NServiceBus.ImmediateRetry ImmediateRetry() { }
public static NServiceBus.MoveToError MoveToError(string errorQueue) { }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,21 @@ public async Task When_unsupported_action_returned_should_move_to_errors()
Assert.AreEqual(1, eventAggregator.NotificationsRaised.Count);
Assert.AreEqual("message-id", failure.Message.MessageId);
}

[Test]
public async Task When_discard_action_returned_should_discard_message()
{
var recoverabilityExecutor = CreateExecutor(
RetryPolicy.Discard());
var errorContext = CreateErrorContext(messageId: "message-id");

var result = await recoverabilityExecutor.Invoke(errorContext);

Assert.AreEqual(ErrorHandleResult.Handled, result);
Assert.AreEqual(0, eventAggregator.NotificationsRaised.Count);
}

[Test]
public async Task When_moving_to_custom_error_queue_custom_error_queue_address_should_be_set_on_notification()
{
var customErrorQueueAddress = "custom-error-queue";
Expand Down Expand Up @@ -217,6 +231,14 @@ public static Func<RecoverabilityConfig, ErrorContext, RecoverabilityAction> Uns
new UnsupportedAction()
}).Invoke;
}

public static Func<RecoverabilityConfig, ErrorContext, RecoverabilityAction> Discard()
{
return new RetryPolicy(new[]
{
new Discard(),
}).Invoke;
}

Queue<RecoverabilityAction> actions;
}
Expand Down
18 changes: 18 additions & 0 deletions src/NServiceBus.Core/Recoverability/Discard.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace NServiceBus
{
/// <summary>
/// Indicates recoverability is required to discard/ignore the current message.
/// </summary>
public sealed class Discard : RecoverabilityAction
{
internal Discard(string reason = null)
{
Reason = reason;
}

/// <summary>
/// The reason why a message was discarded.
/// </summary>
public string Reason { get; }
}
}
17 changes: 17 additions & 0 deletions src/NServiceBus.Core/Recoverability/RecoverabilityAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,24 @@ public static MoveToError MoveToError(string errorQueue)
Guard.AgainstNullAndEmpty(nameof(errorQueue), errorQueue);
return new MoveToError(errorQueue);
}

/// <summary>
/// Creates a discard recoverability action.
/// </summary>
/// <returns>Discard action.</returns>
public static Discard Discard() => Discard(null);

/// <summary>
/// Creates a discard recoverability action.
/// </summary>
/// <param name="reason">The reason why the message was discarded.</param>
/// <returns>Discard action.</returns>
public static Discard Discard(string reason)
{
return string.IsNullOrEmpty(reason) ? CachedDiscard : new Discard(reason);
}

static ImmediateRetry CachedImmediateRetry = new ImmediateRetry();
static Discard CachedDiscard = new Discard();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public Task<ErrorHandleResult> Invoke(ErrorContext errorContext)
{
var recoveryAction = recoverabilityPolicy(configuration, errorContext);

if (recoveryAction is Discard discard)
{
Logger.Info($"Discarding message with id '{errorContext.Message.MessageId}'.{(string.IsNullOrEmpty(discard.Reason) ? string.Empty : $" Reason: {discard.Reason}")}", errorContext.Exception);
return HandledTask;
}

// When we can't do immediate retries and policy did not honor MaxNumberOfRetries for ImmediateRetries
if (recoveryAction is ImmediateRetry && !immediateRetriesAvailable)
{
Expand Down Expand Up @@ -119,7 +125,8 @@ await eventAggregator.Raise(
bool immediateRetriesAvailable;
bool delayedRetriesAvailable;

static Task<ErrorHandleResult> HandledTask = Task.FromResult(ErrorHandleResult.Handled);
static ILog Logger = LogManager.GetLogger<RecoverabilityExecutor>();
RecoverabilityConfig configuration;
}
}
}