From 0d3f118c8321e6eb52dfc690d2ee6f7768e58c2c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 19 Oct 2018 17:16:38 +0100 Subject: [PATCH] Spec re #15 --- Directory.Build.props | 2 +- README.md | 40 +++++-- src/CallPolly/BulkheadMulti.fs | 66 ++++++++++ src/CallPolly/CallPolly.fsproj | 1 + src/CallPolly/Config.fs | 15 ++- src/CallPolly/Events.fs | 53 ++++++-- src/CallPolly/Infrastructure.fs | 21 +++- src/CallPolly/Parser.fs | 2 + src/CallPolly/Rules.fs | 77 +++++++++--- tests/CallPolly.Acceptance/Scenarios.fs | 53 +++++++- tests/CallPolly.Tests/Infrastructure.fs | 12 ++ tests/CallPolly.Tests/ParsingTests.fs | 23 ++++ tests/CallPolly.Tests/RulesTests.fs | 153 +++++++++++++++++++++--- 13 files changed, 459 insertions(+), 59 deletions(-) create mode 100644 src/CallPolly/BulkheadMulti.fs diff --git a/Directory.Build.props b/Directory.Build.props index 0b8cc7c..d70dd52 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,6 +1,6 @@ - 0.0.5 + 0.0.6 @jet @bartelink and contributors Jet.com Apply systemwide resilience strategies consistently across subsystems, standing on Polly's shoulders diff --git a/README.md b/README.md index b270210..331bc1a 100644 --- a/README.md +++ b/README.md @@ -18,18 +18,18 @@ CallPolly wraps Polly to provide: ## Non-goals -- low level policy implementations should live elsewhere (see [CONTRIBUTION notes](#contribution-notes)) -- the core CallPolly library faciltates, but should never bind _directly_ to any specific log or metrics emission sink +- low level policy implementations should live elsewhere (see [CONTRIBUTION notes](#contribution-notes)) - _[e.g., `BulkheadMulti` needs to move out](https://github.com/App-vNext/Polly/issues/507)_ +- the core CallPolly library facilitates, but should never bind _directly_ to any specific log or metrics emission sink # Dependencies -The core library extends [`Polly`](https://github.com/App-vNext/Polly) and is intended to work based on `netstandard20` +The core library extends [`Polly`](https://github.com/App-vNext/Polly) and is intended to support running on `netstandard2.0` and `net461`. For reasons of code clarity and performance, a core secondary dependency is [`Serilog`](https://github.com/serilog/serilog); the pervasiveness and low dependency nature of Serilog and the [practical unlimited interop with other loggers/targets/sinks](https://github.com/serilog/serilog/wiki/Provided-Sinks) is considered enough of a win to make this a hard dependency _e.g., if your logger is NLog, it's 2 lines of code to [forward to it](https://www.nuget.org/packages/serilog.sinks.nlog) with minimal perf cost over CallPolly binding to that directly_. -Being written in F#, there's a dependency on `FSharp.Core`. +Being written in F#, there's a dependency on `FSharp.Core` (v4.5 for `netstandard2.0`, or anything >= `3.1.2.5` / F# 3.1 as present in VS 2012 if you're targeting `net461`). -The tests [`xUnit.net`](https://github.com/xunit/xunit), [`FSCheck.xUnit`](https://github.com/fscheck/FsCheck), [`Unquote`](https://github.com/SwensenSoftware/unquote) and [`Serilog.Sinks.Seq`](https://github.com/serilog/serilog-sinks-seq) (to view, see https://getseq.net, which provides a free single user license for clearer insight into log traces). +The tests use [`xUnit.net`](https://github.com/xunit/xunit), [`FSCheck.xUnit`](https://github.com/fscheck/FsCheck), [`Unquote`](https://github.com/SwensenSoftware/unquote) and [`Serilog.Sinks.Seq`](https://github.com/serilog/serilog-sinks-seq) (to view, see https://getseq.net, which provides a free single user license for clearer insight into log traces). The acceptance tests add a reliance on [`Newtonsoft.Json`](https://github.com/JamesNK/Newtonsoft.Json). @@ -50,7 +50,7 @@ In service of this, the assumption is that most extensions to CallPolly should l Yes, there should be a real README with real examples; we'll get there :sweat_smile: -See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/CallPolly.Acceptance/Orchestration.fs#L142) for behavior implied by this configuration: +See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/CallPolly.Acceptance/Scenarios.fs) for behavior implied by this configuration: ``` { "services": { @@ -62,10 +62,10 @@ See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/Ca "defaultPolicy": null, "policies": { "quick": [ - { "rule": "Cutoff", "timeoutMs": 1000, "slaMs": 500 } + { "rule": "Cutoff", "timeoutMs": 1000, "slaMs": 500 } ], "slow": [ - { "rule": "Cutoff", "timeoutMs": 10000, "slaMs": 5000 } + { "rule": "Cutoff", "timeoutMs": 10000, "slaMs": 5000 } ] } }, @@ -78,10 +78,10 @@ See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/Ca "defaultPolicy": null, "policies": { "default": [ - { "rule": "Limit", "maxParallel": 10, "maxQueue": 3 } + { "rule": "Limit", "maxParallel": 10, "maxQueue": 3 } ], "looser": [ - { "rule": "Limit", "maxParallel": 100, "maxQueue": 300 } + { "rule": "Limit", "maxParallel": 100, "maxQueue": 300 } ], "defaultBroken": [ { "rule": "Isolate" } @@ -95,11 +95,25 @@ See the [acceptance tests](https://github.com/jet/CallPolly/blob/master/tests/Ca "defaultPolicy": null, "policies": { "default": [ - { "rule": "Limit", "maxParallel": 2, "maxQueue": 8 }, - { "rule": "Break", "windowS": 5, "minRequests": 10, "failPct": 20, "breakS": 1 } + { "rule": "Limit", "maxParallel": 2, "maxQueue": 8 }, + { "rule": "Break", "windowS": 5, "minRequests": 10, "failPct": 20, "breakS": 1 }, + { "rule": "Uri", "base": "https://upstreamb" }, + { "rule": "Log", "req": "Always", "res": "Always" } + ] + } +}, +"upstreamC": { + "calls": {}, + "defaultPolicy": "default", + "policies": { + "default": [ + { "rule": "Limit", "maxParallel": 10, "maxQueue": 20 }, + { "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientIp" }, + { "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientDomain" }, + { "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientType" } ] } } } -``` +``` \ No newline at end of file diff --git a/src/CallPolly/BulkheadMulti.fs b/src/CallPolly/BulkheadMulti.fs new file mode 100644 index 0000000..65ac146 --- /dev/null +++ b/src/CallPolly/BulkheadMulti.fs @@ -0,0 +1,66 @@ +module private CallPolly.BulkheadMulti + +open Polly +open System +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +type RefCounted<'T> = { mutable refCount: int; value: 'T } + +// via https://stackoverflow.com/a/31194647/11635 +type private BulkheadPool(gen : unit -> Bulkhead.BulkheadPolicy) = + let inners: Dictionary> = Dictionary() + + let getOrCreateSlot key = + lock inners <| fun () -> + match inners.TryGetValue key with + | true, inner -> + inner.refCount <- inner.refCount + 1 + inner.value + | false, _ -> + let value = gen () + inners.[key] <- { refCount = 1; value = value } + value + let slotReleaseGuard key : IDisposable = + { new System.IDisposable with + member __.Dispose() = + lock inners <| fun () -> + let item = inners.[key] + match item.refCount with + | 1 -> inners.Remove key |> ignore + | current -> item.refCount <- current - 1 } + member __.Execute k f = async { + let x = getOrCreateSlot k + use _ = slotReleaseGuard k + return! f x } + member __.DumpState() : IDictionary = + lock inners <| fun () -> + dict <| seq { for KeyValue(k, { value = v }) in inners -> k,v } + +type BulkheadMultiAsyncPolicy private + ( tag : string, + locks: BulkheadPool, + asyncExecutionPolicy : Func, Context, CancellationToken, bool, Task>) = + inherit Polly.Policy(asyncExecutionPolicy, Seq.empty) + new(tag, maxParallelization, maxQueuingActions, tryGetTagValue, onBulkheadRejected) = + let mkBulkheadForTagValue () = Policy.BulkheadAsync(maxParallelization, maxQueuingActions, fun ctx -> onBulkheadRejected ctx; Task.CompletedTask) + let locks = BulkheadPool(mkBulkheadForTagValue) + let run (inner: Func) (ctx: Context) (ct: CancellationToken) (continueOnCapturedContext: bool) : Task = + match tryGetTagValue ctx with + | None -> inner.Invoke(ctx, ct) + | Some tagVal -> + let startInnerTask (ctx: Context) (ct: CancellationToken) = + inner.Invoke(ctx,ct) + let executeInner (bulkhead: Polly.Bulkhead.BulkheadPolicy) : Async = + bulkhead.ExecuteAsync(Func startInnerTask, ctx, ct, continueOnCapturedContext) + |> Async.AwaitTaskCorrect + Async.StartAsTask(locks.Execute tagVal executeInner, cancellationToken=ct) :> _ + new BulkheadMultiAsyncPolicy(tag, locks, Func,Context,CancellationToken,bool,_> run) + member __.DumpState() : IDictionary = locks.DumpState() + member __.Tag = tag + +type Policy = + /// Placeholder impl of https://github.com/App-vNext/Polly/issues/507 + static member BulkheadMultiAsync(tag, maxParallelization : int, maxQueuingActions : int, tryGetTagValue : Context -> string option, onBulkheadRejected: Context -> unit) = + BulkheadMultiAsyncPolicy(tag, maxParallelization, maxQueuingActions, tryGetTagValue, onBulkheadRejected) \ No newline at end of file diff --git a/src/CallPolly/CallPolly.fsproj b/src/CallPolly/CallPolly.fsproj index 25ad7ea..0020f3f 100644 --- a/src/CallPolly/CallPolly.fsproj +++ b/src/CallPolly/CallPolly.fsproj @@ -11,6 +11,7 @@ + diff --git a/src/CallPolly/Config.fs b/src/CallPolly/Config.fs index aa0b2f3..694dc8c 100644 --- a/src/CallPolly/Config.fs +++ b/src/CallPolly/Config.fs @@ -25,6 +25,11 @@ module Policy = [] dryRun: bool option } + [] + [] + type LimitByInput = + { tag: string; maxParallel: int; maxQueue: int } + [] [] type CutoffInput = @@ -40,6 +45,7 @@ module Policy = type Value = | Break of BreakInput | Limit of LimitInput + | LimitBy of LimitByInput | Cutoff of CutoffInput | Isolate @@ -48,6 +54,7 @@ module Policy = type Rule = | Break of Rules.BreakerConfig | Limit of Rules.BulkheadConfig + | LimitBy of Rules.TaggedBulkheadConfig | Cutoff of Rules.CutoffConfig | Isolate @@ -65,6 +72,11 @@ module Policy = dop = x.maxParallel queue = x.maxQueue dryRun = defaultArg x.dryRun false } + | Input.Value.LimitBy x -> + Rule.LimitBy { + dop = x.maxParallel + queue = x.maxQueue + tag = x.tag } | Input.Value.Cutoff ({ timeoutMs=TimeSpanMs timeout } as x) -> Rule.Cutoff { timeout = timeout @@ -76,8 +88,9 @@ module Policy = | Rule.Isolate -> { s with isolate = true } | Rule.Break breakerConfig -> { s with breaker = Some breakerConfig } | Rule.Limit bulkheadConfig -> { s with limit = Some bulkheadConfig } + | Rule.LimitBy taggedBulkheadConfig -> { s with taggedLimits = s.taggedLimits @ [taggedBulkheadConfig] } | Rule.Cutoff cutoffConfig -> { s with cutoff = Some cutoffConfig } - Seq.fold folder { isolate = false; cutoff = None; limit = None; breaker = None } + Seq.fold folder { isolate = false; cutoff = None; limit = None; taggedLimits = []; breaker = None } let ofInputs xs = xs |> Seq.map interpret |> fold module Http = diff --git a/src/CallPolly/Events.fs b/src/CallPolly/Events.fs index 5c2d42c..28a1553 100644 --- a/src/CallPolly/Events.fs +++ b/src/CallPolly/Events.fs @@ -1,6 +1,7 @@ module CallPolly.Events open System +open System.Collections.Generic /// Represents a time measurement of a computation that includes stopwatch tick metadata [] @@ -15,7 +16,14 @@ module Constants = let [] EventPropertyName = "cpe" type BreakerParams = { window: TimeSpan; minThroughput: int; errorRateThreshold: float } -type BulkheadParams = { dop: int; queue: int } + +[] +type BulkheadParams = + | Any of GlobalBulkheadParams + | Tagged of TaggedBulkheadParams +and [] GlobalBulkheadParams = { dop: int; queue: int } +and [] TaggedBulkheadParams = { tag: string; dop: int; queue: int } + [] type CutoffParams = { timeout: TimeSpan; sla: Nullable } @@ -23,8 +31,8 @@ type CutoffParams = { timeout: TimeSpan; sla: Nullable } type Event = | Isolated of service: string * call: string | Broken of service: string * call: string * config: BreakerParams - | Deferred of service: string * call: string * interval: StopwatchInterval - | Shed of service: string * call: string * config: BulkheadParams + | Deferred of service: string * call: string * tags: IReadOnlyDictionary * interval: StopwatchInterval + | Shed of service: string * call: string * tags: IReadOnlyDictionary * config: BulkheadParams | Breached of service: string * call: string * sla: TimeSpan * interval: StopwatchInterval | Canceled of service: string * call: string * config: CutoffParams * interval: StopwatchInterval override __.ToString() = "(Metrics)" @@ -32,6 +40,12 @@ type Event = module internal Log = open Serilog.Events + // Serilog will grab [only] real Dictionaries (not IDictionary or IReadOnlyDictionary), so we need to wrap them like this for them to be captured as maps + let private makePropertyForCapturableAsMap (rod: IReadOnlyDictionary<_,_>) : Dictionary<_,_> = + let d = Dictionary() + for KeyValue (k,v) in rod do d.Add(k,v) + d + /// Attach a property to the log context to hold the metrics // Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124 let private forEvent (value : Event) (log : Serilog.ILogger) = @@ -65,15 +79,34 @@ module internal Log = // - in others, the call will eventually fall victim to shedding let queuing (service: string, call:string) (log : Serilog.ILogger) = log.Information("Bulkhead Queuing likely for {service:l}-{call:l}", service, call) - let deferral (service: string, call:string, policy:string) (interval : StopwatchInterval) (concurrencyLimit : int) (log: Serilog.ILogger) = - let lfe = log |> forEvent (Deferred (service, call, interval)) - lfe.Warning("Bulkhead Delayed {service:l}-{call:l} for {timespan} due to concurrency limit of {maxParallel} in {policy:l}", - service, call, interval.Elapsed, concurrencyLimit, policy) - let shedding (service: string, call:string, policy:string) (config:BulkheadParams) (log : Serilog.ILogger) = - let lfe = log |> forEvent (Shed (service, call, config)) - lfe.Warning("Bulkhead Shedding for {service:l}-{call:l} based on {policy:l}: {@bulkheadConfig}", service, call, policy, config) + // Deferral can happen due to either a Limit or a LimitBy + let deferral + (service: string, call:string, policy:string, configLimits: IReadOnlyDictionary) + (interval: StopwatchInterval) + (tags: IReadOnlyDictionary) + (log: Serilog.ILogger) = + let lfe = log |> forEvent (Deferred (service,call,tags,interval)) + match tags with + | dict when dict.Count = 0 -> + lfe.Warning("Bulkhead Delayed {service:l}-{call:l} for {timespan} due to {@maxParallel} in {policy:l}", + service, call, interval.Elapsed, makePropertyForCapturableAsMap configLimits, policy) + | tags -> + lfe.Warning("Bulkhead Delayed {service:l}-{call:l} for {timespan} due to {@maxParallel} with {@tags} in {policy:l}", + service, call, interval.Elapsed, makePropertyForCapturableAsMap configLimits, makePropertyForCapturableAsMap tags, policy) + // Shedding can happen due to either a Limit or a LimitBy + let shedding (service: string, call: string, policy: string) (tags: IReadOnlyDictionary, config:BulkheadParams) (log : Serilog.ILogger) = + let lfe = log |> forEvent (Shed (service,call,tags,config)) + let flatConfig = match config with Any a -> box a | Tagged t -> box t + match tags with + | dict when dict.Count = 0 -> + lfe.Warning("Bulkhead Shedding for {service:l}-{call:l} based on {policy:l}: {@bulkheadConfig}", + service, call, policy, flatConfig) + | tags -> + lfe.Warning("Bulkhead Shedding for {service:l}-{call:l} with {@tags} based on {policy:l}: {@bulkheadConfig}", + service, call, makePropertyForCapturableAsMap tags, policy, flatConfig) let queuingDryRun (service: string, call:string) (log : Serilog.ILogger) = log.ForContext("dryRun",true).Information("Bulkhead DRYRUN Queuing for {service:l}-{call:l}", service, call) + // Only implemented for Limit, not LimitBy let sheddingDryRun (service: string, call:string) (log : Serilog.ILogger) = log.ForContext("dryRun",true).Warning("Bulkhead DRYRUN Shedding for {service:l}-{call:l}", service, call) diff --git a/src/CallPolly/Infrastructure.fs b/src/CallPolly/Infrastructure.fs index 16360dd..44f1ca5 100644 --- a/src/CallPolly/Infrastructure.fs +++ b/src/CallPolly/Infrastructure.fs @@ -14,7 +14,7 @@ type TimeSpan with type Async with /// - /// Gets the result of given task so that in the event of exception + /// Awaits the Result of given Task so that in the event of exception /// the actual user exception is raised as opposed to being wrapped /// in a System.AggregateException. /// @@ -29,4 +29,23 @@ type Async with else ec e elif t.IsCanceled then ec(new System.Threading.Tasks.TaskCanceledException()) else sc t.Result) + |> ignore) + /// + /// Awaits the provided Task such that in the event of an exception + /// the actual user exception is raised as opposed to being wrapped + /// in a System.AggregateException. + /// + /// Task to be awaited. + [] + static member AwaitTaskCorrect(task : System.Threading.Tasks.Task) : Async = + Async.FromContinuations(fun (sc,ec,cc) -> + task.ContinueWith(fun (task:System.Threading.Tasks.Task) -> + if task.IsFaulted then + let e = task.Exception + if e.InnerExceptions.Count = 1 then ec e.InnerExceptions.[0] + else ec e + elif task.IsCanceled then + ec(System.Threading.Tasks.TaskCanceledException()) + else + sc ()) |> ignore) \ No newline at end of file diff --git a/src/CallPolly/Parser.fs b/src/CallPolly/Parser.fs index 57e6fd1..cb462b4 100644 --- a/src/CallPolly/Parser.fs +++ b/src/CallPolly/Parser.fs @@ -65,6 +65,7 @@ and [] | Isolate | Break of Config.Policy.Input.BreakInput | Limit of Config.Policy.Input.LimitInput + | LimitBy of Config.Policy.Input.LimitByInput | Cutoff of Config.Policy.Input.CutoffInput (* Config.Http.Input.Value *) @@ -117,6 +118,7 @@ let parseInternal defsJson : ParsedService [] = | Input.Isolate -> yield ParsedRule.Policy Config.Policy.Input.Value.Isolate | Input.Break x -> yield ParsedRule.Policy (Config.Policy.Input.Value.Break x) | Input.Limit x -> yield ParsedRule.Policy (Config.Policy.Input.Value.Limit x) + | Input.LimitBy x -> yield ParsedRule.Policy (Config.Policy.Input.Value.LimitBy x) | Input.Cutoff x -> yield ParsedRule.Policy (Config.Policy.Input.Value.Cutoff x) | Input.Unknown x -> yield ParsedRule.Unknown x diff --git a/src/CallPolly/Rules.fs b/src/CallPolly/Rules.fs index 5346b42..a2affb4 100644 --- a/src/CallPolly/Rules.fs +++ b/src/CallPolly/Rules.fs @@ -2,8 +2,9 @@ open Polly open System -open System.Collections.Generic open System.Collections.Concurrent +open System.Collections.Generic +open System.Collections.ObjectModel open System.Diagnostics open System.Threading open System.Threading.Tasks @@ -14,17 +15,35 @@ module private Option = type BreakerConfig = { window: TimeSpan; minThroughput: int; errorRateThreshold: float; retryAfter: TimeSpan; dryRun: bool } type BulkheadConfig = { dop: int; queue: int; dryRun: bool } +type TaggedBulkheadConfig = { tag: string; dop: int; queue: int } type CutoffConfig = { timeout: TimeSpan; sla: TimeSpan option; dryRun: bool } type PolicyConfig = { // Prettify DumpState diagnostic output - this structure is not roundtripped but isolated circuits stand out better when rare [] isolate: bool - cutoff: CutoffConfig option; limit: BulkheadConfig option; breaker: BreakerConfig option } - -type GovernorState = { circuitState : string option; bulkheadAvailable : int option; queueAvailable : int option } + cutoff: CutoffConfig option; limit: BulkheadConfig option; taggedLimits: TaggedBulkheadConfig list; breaker: BreakerConfig option } + +type GovernorState = + { circuitState : string option; + bulkheadAvailable : int option; queueAvailable : int option + // sic - array, rendered as null so it does not get rendered where not applicable + taggedAvailabilities : TaggedBulkheadState[] } +and TaggedBulkheadState = + { tag: string + items: MultiBulkheadState list } +and MultiBulkheadState = + { value: string + bulkheadAvailable : int + queueAvailable : int } + +let emptyRod : IReadOnlyDictionary = ReadOnlyDictionary(Dictionary()) :> _ type Polly.Context with member __.Log = __.Item("log") :?> Serilog.ILogger + member __.Tags : IReadOnlyDictionary = + match __.TryGetValue "tags" with + | true, (:? IReadOnlyDictionary as v) -> v + | _ -> emptyRod /// Translates a PolicyConfig's rules to a Polly IAsyncPolicy instance that gets held in the ActionPolicy type Governor @@ -46,7 +65,12 @@ type Governor Some <| Policy.TimeoutAsync(cutoff.timeout) | _ -> None let logQueuing log = log |> Events.Log.queuing (serviceName, callName) - let logDeferral log interval concurrencyLimit = log |> Events.Log.deferral (serviceName, callName, policyName) interval concurrencyLimit + let logDeferral log interval tags = + let mkRod pairs : IReadOnlyDictionary = System.Collections.ObjectModel.ReadOnlyDictionary(pairs |> dict) :> _ + let configLimits = mkRod <| seq { + match config.limit with None -> () | Some limit -> yield "any", limit.dop + for limitBy in config.taggedLimits do yield limitBy.tag, limitBy.dop } + log |> Events.Log.deferral (serviceName, callName, policyName, configLimits) interval tags let logShedding log config = log |> Events.Log.shedding (serviceName, callName, policyName) config let logSheddingDryRun log = log |> Events.Log.sheddingDryRun (serviceName, callName) let logQueuingDryRun log = log |> Events.Log.queuingDryRun (serviceName, callName) @@ -54,10 +78,19 @@ type Governor match config.limit with | None -> None | Some limit -> - let logRejection (c: Context) : Task = logShedding c.Log { dop = limit.dop; queue = limit.queue } ; Task.CompletedTask + let logRejection (c: Context) : Task = logShedding c.Log (c.Tags, Events.Any {dop = limit.dop; queue = limit.queue }) ; Task.CompletedTask let effectiveLimit = if limit.dryRun then Int32.MaxValue else limit.dop // https://github.com/App-vNext/Polly/issues/496#issuecomment-420183946 stateLog.Debug("Establishing BulkheadAsync for {service:l}-{call:l} {effectiveLimit}+{queue}", serviceName, callName, effectiveLimit, limit.queue) Some <| Policy.BulkheadAsync(maxParallelization = effectiveLimit, maxQueuingActions = limit.queue, onBulkheadRejectedAsync = logRejection) + let multiBulkheads : BulkheadMulti.BulkheadMultiAsyncPolicy list = + [ for limitBy in config.taggedLimits -> + let logRejection (c: Context) = logShedding c.Log (c.Tags, Events.Tagged { tag = limitBy.tag; dop = limitBy.dop; queue = limitBy.queue }) + stateLog.Debug("Establishing BulkheadAsyncMulti by {tag} for {service:l}-{call:l} {effectiveLimit}+{queue}", limitBy.tag, serviceName, callName, limitBy.dop, limitBy.queue) + let tryGetTagValue (c: Context) = + match c.Tags.TryGetValue limitBy.tag with + | true, value -> Some value + | _ -> None + BulkheadMulti.Policy.BulkheadMultiAsync(tag = limitBy.tag, maxParallelization = limitBy.dop, maxQueuingActions = limitBy.queue, tryGetTagValue=tryGetTagValue, onBulkheadRejected = logRejection) ] let logBreaking (exn : exn) (timespan: TimeSpan) = match config with | { isolate = true } -> () @@ -96,6 +129,7 @@ type Governor let asyncPolicy : IAsyncPolicy option = [| match maybeCutoff with Some x -> yield x :> IAsyncPolicy | _ -> () match maybeBulkhead with Some x -> yield x :> IAsyncPolicy | _ -> () + for b in multiBulkheads do yield b :> IAsyncPolicy match maybeCb with Some x -> yield x :> IAsyncPolicy | _ -> () |] |> function | [||] -> @@ -128,7 +162,7 @@ type Governor Some () // Compiler gets too clever if we never return Some /// Execute and/or log failures regarding invocation of a function with the relevant policy applied - member __.Execute(inner : Async<'a>, ?log) : Async<'a> = + member __.Execute(inner : Async<'a>, ?log : Serilog.ILogger, ?tags: IReadOnlyDictionary) : Async<'a> = let callLog = defaultArg log stateLog match asyncPolicy with | None -> @@ -161,11 +195,11 @@ type Governor let activeCount = Int32.MaxValue - bh.BulkheadAvailableCount if activeCount > limit.dop + limit.queue then logSheddingDryRun callLog elif activeCount > limit.dop && wasFull then logQueuingDryRun callLog - | { limit = Some ({ dryRun = false } as limit) }, _ -> + | { limit = Some { dryRun = false } }, _ -> let commenceProcessingTicks = Stopwatch.GetTimestamp() let deferralInterval = Events.StopwatchInterval(startTicks, commenceProcessingTicks) if (let e = deferralInterval.Elapsed in e.TotalMilliseconds) > 1. then - logDeferral callLog deferralInterval limit.dop + logDeferral callLog deferralInterval (defaultArg tags emptyRod) | _ -> () // sic - cancellation of the inner computation needs to be controlled by Polly's chain of handlers @@ -174,8 +208,12 @@ type Governor let execute = async { let! ct = Async.CancellationToken // Grab async cancellation token of this `Execute` call, so cancellation gets propagated into the Polly [wrap] callLog.Debug("Policy Execute Inner {service:l}-{call:l}", serviceName, callName) - let ctx = Seq.singleton ("log", box callLog) |> dict - try return! polly.ExecuteAsync(startInnerTask, ctx, ct) |> Async.AwaitTaskCorrect + let contextData = dict <| seq { + yield ("log", box callLog) + match tags with + | None -> () + | Some (t : IReadOnlyDictionary) -> yield "tags", box t } + try return! polly.ExecuteAsync(startInnerTask, contextData, ct) |> Async.AwaitTaskCorrect // TODO find/add a cleaner way to use the Polly API to log when the event fires due to the the circuit being Isolated/Broken with LogWhenRejectedFilter callLog jitProcessingInterval -> return! invalidOp "not possible; Filter always returns None" } match config.cutoff with @@ -197,7 +235,18 @@ type Governor member __.InternalState : GovernorState = { circuitState = maybeCb |> Option.map (fun cb -> string cb.CircuitState) bulkheadAvailable = maybeBulkhead |> Option.map (fun bh -> bh.BulkheadAvailableCount) - queueAvailable = maybeBulkhead |> Option.map (fun bh -> bh.QueueAvailableCount) } + queueAvailable = maybeBulkhead |> Option.map (fun bh -> bh.QueueAvailableCount) + taggedAvailabilities = + match multiBulkheads with + | [] -> null + | tbhs -> + [| for tbh in tbhs -> + { tag = tbh.Tag + items = + [ for KeyValue(tagValue,bh) in tbh.DumpState() -> + { value = tagValue + bulkheadAvailable = bh.BulkheadAvailableCount + queueAvailable = bh.QueueAvailableCount } ] } |]} type [] ChangeLevel = Added | ConfigurationAndPolicy | Configuration | Policy @@ -230,8 +279,8 @@ type CallPolicy<'TConfig when 'TConfig: equality> (makeGoverner : CallConfig<'TC member __.Config = cfg.config /// Execute the call, apply the policy rules - member __.Execute(inner : Async<'t>, ?log) = - governor.Execute(inner,?log=log) + member __.Execute(inner : Async<'t>, ?log, ?tags) = + governor.Execute(inner, ?log=log, ?tags=tags) /// Facilitates dumping for diagnostics member __.InternalState = diff --git a/tests/CallPolly.Acceptance/Scenarios.fs b/tests/CallPolly.Acceptance/Scenarios.fs index c895ea8..08a4618 100644 --- a/tests/CallPolly.Acceptance/Scenarios.fs +++ b/tests/CallPolly.Acceptance/Scenarios.fs @@ -68,6 +68,18 @@ let policy = """{ { "rule": "Log", "req": "Always", "res": "Always" } ] } + }, + "upstreamC": { + "calls": {}, + "defaultPolicy": "default", + "policies": { + "default": [ + { "rule": "Limit", "maxParallel": 10, "maxQueue": 20 }, + { "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientIp" }, + { "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientDomain" }, + { "rule": "LimitBy","maxParallel": 2, "maxQueue": 4, "tag": "clientType" } + ] + } } } } @@ -94,7 +106,8 @@ type Act = type Sut(log : Serilog.ILogger, policy: CallPolly.Rules.Policy<_>) = let run serviceName callName f = policy.Find(serviceName, callName).Execute(f) - let runLog callLog serviceName callName f = policy.Find(serviceName, callName).Execute(f, callLog) + let runLog callLog serviceName callName f = policy.Find(serviceName, callName).Execute(f, log=callLog) + let runWithTags serviceName callName tags f = policy.Find(serviceName, callName).Execute(f, tags=tags) let _upstreamA1 (a : Act) = async { log.Information("A1") @@ -130,6 +143,16 @@ type Sut(log : Serilog.ILogger, policy: CallPolly.Rules.Policy<_>) = return! upstreamB1 a } + let _upstreamC1 (a : Act) = async { + log.Information("C1") + return! a.Execute() } + let upstreamC1 tags a = runWithTags "upstreamC" "Call1" tags <| _upstreamC1 a + + let _apiC tags (a : Act) = async { + log.Information "ApiC" + return! upstreamC1 tags a + } + member __.ApiOneSecondSla a1 a2 = run "ingres" "api-a" <| _apiA a1 a2 member __.ApiOneSecondSlaLog callLog a1 a2 = runLog callLog "ingres" "api-a" <| _apiA a1 a2 @@ -137,6 +160,10 @@ type Sut(log : Serilog.ILogger, policy: CallPolly.Rules.Policy<_>) = member __.ApiManualBroken = _apiABroken + member __.ApiMulti tags (a : Act) = + let tags = tags |> dict |> System.Collections.ObjectModel.ReadOnlyDictionary + run "ingres" "api-a" <| _apiC tags a + let (|Http200|Http500|Http502|Http503|Http504|) : Choice -> _ = function | Choice1Of2 _ -> Http200 | Choice2Of2 (:? Polly.ExecutionRejectedException) -> Http503 @@ -264,14 +291,32 @@ type Scenarios(output : Xunit.Abstractions.ITestOutputHelper) = let [] ``Limit - Bulkhead functionality`` () = async { let policy = Parser.parse(policy).CreatePolicy log let sut = Sut(log, policy) - // 10 fails put it into circuit breaking mode - let's do 9 and the step carefully let alternateBetweenTwoUpstreams i = if i % 2 = 0 then sut.ApiOneSecondSla Succeed (DelayMs 100) else sut.ApiOneSecondSla (DelayMs 100) Succeed |> Async.Catch let! time, res = List.init 1000 alternateBetweenTwoUpstreams |> Async.Parallel |> Stopwatch.Time - let counts = res |> Seq.countBy (function Status s -> s) |> Seq.sortBy fst |> List.ofSeq - test <@ match counts with [200,successCount; 503,rejectCount] -> successCount < 100 && rejectCount > 800 | x -> failwithf "%A" x @> + let counts = res |> Seq.countBy (|Status|) |> Seq.sortBy fst |> List.ofSeq + test <@ match counts with + | [200,successCount; 503,rejectCount] -> successCount < 150 && rejectCount >= 850 + | x -> failwithf "%A" x @> + test <@ between 0.3 2.5 (let t = time.Elapsed in t.TotalSeconds) @> + } + + let [] ``LimitBy - BulkheadMulti functionality`` () = async { + let policy = Parser.parse(policy).CreatePolicy log + let sut = Sut(log, policy) + let act i = + match i % 3 with + | 0 -> sut.ApiMulti ["clientIp","A"] (DelayMs 100) + | 1 -> sut.ApiMulti ["clientDomain","A"] (DelayMs 100) + | _ -> sut.ApiMulti ["clientType","A"] (DelayMs 100) + |> Async.Catch + let! time, res = List.init 1000 act |> Async.Parallel |> Stopwatch.Time + let counts = res |> Seq.countBy (|Status|) |> Seq.sortBy fst |> List.ofSeq + test <@ match counts with + | [200,successCount; 503,rejectCount] -> successCount < 20 && rejectCount > 980 + | x -> failwithf "%A" x @> test <@ between 0.3 2.5 (let t = time.Elapsed in t.TotalSeconds) @> } diff --git a/tests/CallPolly.Tests/Infrastructure.fs b/tests/CallPolly.Tests/Infrastructure.fs index 6ee8b36..021a6b8 100644 --- a/tests/CallPolly.Tests/Infrastructure.fs +++ b/tests/CallPolly.Tests/Infrastructure.fs @@ -72,6 +72,7 @@ module SerilogHelpers = open Serilog open Serilog.Events open System.Collections.Concurrent + open System.Collections.Generic /// Create a logger, targeting the specified outputs [] @@ -114,6 +115,9 @@ module SerilogHelpers = __.Clear() actual + let (|SerilogDict|_|) : Serilog.Events.LogEventPropertyValue -> IReadOnlyDictionary option = function + | (:? Events.DictionaryValue as x) -> Some x.Elements + | _ -> None let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function | (:? ScalarValue as x) -> Some x.Value | _ -> None @@ -126,6 +130,14 @@ module SerilogHelpers = else None let (|SerilogString|_|) : LogEventPropertyValue -> string option = function SerilogScalar (:? string as y) -> Some y | _ -> None let (|SerilogBool|_|) : LogEventPropertyValue -> bool option = function SerilogScalar (:? bool as y) -> Some y | _ -> None + let (|MaybeMap|) (name : string) (e : LogEvent) : (string * string) list = + match e.Properties.TryGetValue name with + | true, (SerilogDict s) -> + [ for t in s do + match t with + | KeyValue (k,SerilogString v) -> yield (k.Value :?> string),v + | _ -> () ] + | _ -> List.empty let dumpEvent (x : LogEvent) = let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter("{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] {Message}{NewLine}{Exception}|{Properties}", null); diff --git a/tests/CallPolly.Tests/ParsingTests.fs b/tests/CallPolly.Tests/ParsingTests.fs index 968cd9b..cba20b2 100644 --- a/tests/CallPolly.Tests/ParsingTests.fs +++ b/tests/CallPolly.Tests/ParsingTests.fs @@ -101,6 +101,29 @@ type Parsing(output : Xunit.Abstractions.ITestOutputHelper) = raisesWith <@ Parser.parse defs @> (fun x -> <@ x.Message.StartsWith "Include Rule at 'default->(default)->pol' refers recursively to 'pol' (policies: " @>) + +/// Testing derivation of Policy info +type PolicyParsing(output : Xunit.Abstractions.ITestOutputHelper) = + let log = LogHooks.createLogger output + + let [] ``Multiple limit rules stack correctly`` () : unit = + let defs = """{ "services": { "svc": { + "calls": { "call": "default" }, + "defaultPolicy": null, + "policies": { + "default" : [ + { "rule": "LimitBy", "tag": "clientDomain", "maxParallel": 3, "maxQueue": 2 }, + { "rule": "LimitBy", "maxParallel": 2, "maxQueue": 3, "tag": "clientIp" } + ] + } +}}}""" + + let res = Parser.parse(defs).CreatePolicy log + let limits = trap <@ res.TryFind("svc","call").Value.Policy.taggedLimits @> + let first : Rules.TaggedBulkheadConfig = { tag="clientDomain"; dop=3; queue=2 } + let second : Rules.TaggedBulkheadConfig = { tag="clientIp"; dop=2; queue=3 } + test <@ [ first; second ] = limits @> + /// Testing derivation of Config info type ConfigParsing(output : Xunit.Abstractions.ITestOutputHelper) = let log = LogHooks.createLogger output diff --git a/tests/CallPolly.Tests/RulesTests.fs b/tests/CallPolly.Tests/RulesTests.fs index 141d41c..4ffaf55 100644 --- a/tests/CallPolly.Tests/RulesTests.fs +++ b/tests/CallPolly.Tests/RulesTests.fs @@ -45,6 +45,9 @@ module Core = "limit": [ { "rule": "Limit", "maxParallel": 2, "maxQueue": 3, "dryRun": true } ], + "limitBy": [ + { "rule": "LimitBy","maxParallel": 3, "maxQueue": 2, "tag": "clientIp", "dryRun": true } + ], "break": [ { "rule": "Break", "windowS": 5, "minRequests": 100, "failPct": 20, "breakS": 1 } ], @@ -57,6 +60,7 @@ module Core = "heavy": [ { "rule": "Include","policy": "defaultLog" }, { "rule": "Include","policy": "limit" }, + { "rule": "Include","policy": "limitBy" }, { "rule": "Include","policy": "break" }, { "rule": "Include","policy": "cutoff" }, { "rule": "Sla", "slaMs": 5000, "timeoutMs": 10000 }, @@ -72,15 +76,15 @@ module Core = let limitParsed = mkPolicy <| Config.Policy.Input.Value.Limit { maxParallel=2; maxQueue=3; dryRun=Some true } let limitConfig : Rules.BulkheadConfig = { dop=2; queue=3; dryRun=true } - let limitPolicy = Config.Policy.Rule.Limit limitConfig + + let limitByParsed = mkPolicy <| Config.Policy.Input.Value.LimitBy { tag="clientIp"; maxParallel=3; maxQueue=2 } + let limitByConfig : Rules.TaggedBulkheadConfig = { tag="clientIp"; dop=3; queue=2 } let breakParsed = mkPolicy <| Config.Policy.Input.Value.Break { windowS = 5; minRequests = 100; failPct=20.; breakS = 1.; dryRun = None } let breakConfig : Rules.BreakerConfig = { window = s 5; minThroughput = 100; errorRateThreshold = 0.2; retryAfter = s 1; dryRun = false } - let breakPolicy = Config.Policy.Rule.Break breakConfig let cutoffParsed = mkPolicy <| Config.Policy.Input.Value.Cutoff { timeoutMs = 500; slaMs = None; dryRun = Some true } let cutoffConfig : Rules.CutoffConfig = { timeout = ms 500; sla = None; dryRun = true } - let cutoffPolicy = Config.Policy.Rule.Cutoff cutoffConfig let isolateParsed = mkPolicy <| Config.Policy.Input.Value.Isolate @@ -93,12 +97,12 @@ module Core = { timeout = Some (s 5); sla = Some (s 1) ``base`` = Some baseUri; rel = None reqLog = Config.Http.LogLevel.OnlyWhenDebugEnabled; resLog = Config.Http.LogLevel.OnlyWhenDebugEnabled } - let noPolicy = { isolate = false; cutoff = None; limit = None; breaker = None; } + let noPolicy = { isolate = false; cutoff = None; limit = None; taggedLimits = []; breaker = None; } let heavyConfig : Config.Http.Configuration = { timeout = Some (s 10); sla = Some (s 5) ``base`` = None; rel = None reqLog = Config.Http.LogLevel.OnlyWhenDebugEnabled; resLog = Config.Http.LogLevel.OnlyWhenDebugEnabled } - let heavyRules = { isolate = true; cutoff = Some cutoffConfig; limit = Some limitConfig; breaker = Some breakConfig } + let heavyRules = { isolate = true; cutoff = Some cutoffConfig; limit = Some limitConfig; taggedLimits = [limitByConfig]; breaker = Some breakConfig } let mkParsedSla sla timeout = Parser.ParsedRule.Http (Config.Http.Input.Value.Sla {slaMs = sla; timeoutMs = timeout }) /// Base tests exercising core functionality @@ -117,7 +121,7 @@ module Core = && noPolicy = edd.Policy @> let poRaw = findRaw "placeOrder" let po = trap <@ tryFindActionRules "placeOrder" |> Option.get @> - test <@ [logParsed; limitParsed; breakParsed; cutoffParsed; mkParsedSla 5000 10000; isolateParsed] = poRaw + test <@ [logParsed; limitParsed; limitByParsed; breakParsed; cutoffParsed; mkParsedSla 5000 10000; isolateParsed] = poRaw && heavyConfig = po.Config && heavyRules = po.Policy @> test <@ None = tryFindActionRules "missing" @> @@ -130,7 +134,7 @@ module Core = let default_ = findRaw "(default)" test <@ baseParsed :: default_ = findRaw "EstimatedDeliveryDates" @> test <@ [isolateParsed] = findRaw "EstimatedDeliveryDate" @> - test <@ defaultLog @ [limitParsed; breakParsed; cutoffParsed; mkParsedSla 5000 10000; isolateParsed] = findRaw "placeOrder" @> + test <@ defaultLog @ [limitParsed; limitByParsed; breakParsed; cutoffParsed; mkParsedSla 5000 10000; isolateParsed] = findRaw "placeOrder" @> let heavyPolicy = pol.Find("default","placeOrder") test <@ heavyPolicy.Policy.isolate && Some breakConfig = heavyPolicy.Policy.breaker @> @@ -165,8 +169,8 @@ module SerilogExtractors = type LogEvent = | Isolated of service: string * call: string * policy: string | Broken of service: string * call: string * policy: string - | Deferred of service: string * call: string * policy: string * duration: TimeSpan - | Shed of service: string * call: string * policy: string + | Deferred of service: string * call: string * policy: string * tags: (string*string) list * duration: TimeSpan + | Shed of service: string * call: string * policy: string * tags: (string*string) list | Breached of service: string * call: string * duration: TimeSpan | Canceled of service: string * call: string * duration: TimeSpan | Status of string * StatusEvent @@ -192,18 +196,20 @@ module SerilogExtractors = | TemplateContains "Bulkhead Queuing likely for " & HasProp "call" (SerilogString an) -> Call(an, MaybeQueuing) | TemplateContains "Bulkhead DRYRUN Queuing " & HasProp "call" (SerilogString an) -> Call(an, QueuingDryRun) | TemplateContains "Bulkhead DRYRUN Shedding " & HasProp "call" (SerilogString an) -> Call(an, SheddingDryRun) - | CallPollyEvent (Events.Event.Deferred (_service,eCall,eInterval)) + | CallPollyEvent (Events.Event.Deferred (_service,eCall,_tags,eInterval)) & HasProp "service" (SerilogString service) & HasProp "call" (SerilogString call) & HasProp "policy" (SerilogString policy) + & MaybeMap "tags" maybeTags when eCall = call -> - Deferred (service,call,policy, eInterval.Elapsed) - | CallPollyEvent (Events.Event.Shed (_service,eCall,_config)) + Deferred (service,call,policy,maybeTags, eInterval.Elapsed) + | CallPollyEvent (Events.Event.Shed (_service,eCall,_tags,_config)) & HasProp "service" (SerilogString service) & HasProp "call" (SerilogString call) & HasProp "policy" (SerilogString policy) + & MaybeMap "tags" maybeTags when eCall = call -> - Shed (service,call,policy) + Shed (service,call,policy,maybeTags) | CallPollyEvent (Events.Event.Breached (_service,eCall,_sla,interval)) & HasProp "service" (SerilogString service) & HasProp "call" (SerilogString call) @@ -442,8 +448,8 @@ type Limit(output : Xunit.Abstractions.ITestOutputHelper) = let evnts = buffer.Take() let queuedOrShed = function | Call ("(default)",MaybeQueuing) as x -> Choice1Of3 x - | Deferred ("default","(default)","def",delay) as x -> Choice2Of3 delay - | Shed ("default","(default)","def") as x -> Choice3Of3 x + | Deferred ("default","(default)","def",[], delay) as x -> Choice2Of3 delay + | Shed ("default","(default)","def",[]) as x -> Choice3Of3 x | x -> failwithf "Unexpected event %A" x let queued,waited,shed = evnts |> Seq.map queuedOrShed |> Choice.partition3 let delayed = waited |> Array.filter (fun x -> x > ms 500) @@ -452,6 +458,123 @@ type Limit(output : Xunit.Abstractions.ITestOutputHelper) = && between 2 3 (Array.length delayed) // Typically, 3 should get delayed, but depending on scheduling, only 2 get logged as such, and we don't want a flickering test && 1 = Array.length shed @> } +type LimitBy(output : Xunit.Abstractions.ITestOutputHelper) = + let log, buffer = LogHooks.createLoggerWithCapture output + + let limitByOnly = """{ "services": { "default": { + "calls": {}, + "defaultPolicy": "def", + "policies": { + "def": [ + { "rule": "LimitBy", "maxParallel": 2, "maxQueue": 3, "tag": "clientIp" }, + { "rule": "LimitBy", "maxParallel": 2, "maxQueue": 3, "tag": "clientDomain" } + ] + } +}}}""" + let expectedRule : Rules.TaggedBulkheadConfig = { dop = 2; queue = 3; tag="clientIp" } + let expectedDomainRule : Rules.TaggedBulkheadConfig = { dop = 2; queue = 3; tag="clientDomain" } + + let runOk = async { + do! Async.Sleep (s 1) + return 42 } + + let [] ``Enforces queueing and sheds load above limit whenever there is contention on a tag`` () = async { + let pol = Parser.parse(limitByOnly).CreatePolicy log + let ap = pol.Find("default","any") + test <@ [expectedRule; expectedDomainRule] = ap.Policy.taggedLimits @> + let contending = + let alternateDomainVsIpContention i = + match i % 2 with + | 0 -> ["clientIp","A"] + | _ -> ["clientIp",string i; "clientDomain","B"] + Seq.replicate 12 runOk + |> Seq.zip (Seq.init 12 alternateDomainVsIpContention) + let notContending = + let alternateUnderRadar i = + match i % 3 with + | 0 -> "C" + | 1 -> "D" + | _ -> "E" + Seq.replicate 12 runOk + |> Seq.zip (Seq.init 12 alternateUnderRadar |> Seq.map (fun k -> ["clientIp",k])) + let mkTags tags = + tags |> dict |> System.Collections.ObjectModel.ReadOnlyDictionary + let! time, results = + Seq.append notContending contending + // Catch inside so the first throw doesnt cancel the overall execution + |> Seq.map(fun (tags, op) -> ap.Execute(op, tags=mkTags tags) |> Async.Catch) + |> Async.Parallel + |> Stopwatch.Time + let oks, errs = Choice.partition results + test <@ 10+12 = Array.length oks + && time.Elapsed > s 2 && time.Elapsed < s 4 + && 2 = Array.length errs + && errs |> Array.forall (function :? Polly.Bulkhead.BulkheadRejectedException -> true | _ -> false) @> + let evnts = buffer.Take() + let shed = function + | Shed ("default","(default)","def",tags) -> Some tags + | x -> failwithf "Unexpected event %A" x + let shed = evnts |> Seq.choose shed |> Array.ofSeq + test <@ 2 = Array.length shed + && shed |> Array.exists (List.contains ("clientIp","A")) + && shed |> Array.exists (List.contains ("clientDomain","B")) @> } + + let limitWithLimitBy = """{ "services": { "default": { + "calls": {}, + "defaultPolicy": "def", + "policies": { + "def": [ + { "rule": "Limit", "maxParallel": 24, "maxQueue": 0 }, + { "rule": "LimitBy", "maxParallel": 2, "maxQueue": 3, "tag": "clientIp" } + ] + } +}}}""" + + let [] ``When allied with a limit rule, sheds load above limit, and provides clear logging`` () = async { + let pol = Parser.parse(limitWithLimitBy).CreatePolicy log + let ap = pol.Find("default","any") + test <@ [expectedRule] = ap.Policy.taggedLimits @> + let contending = + let alternateIps i = + match i % 2 with + | 0 -> "A" + | _ -> "B" + Seq.replicate 12 runOk + |> Seq.zip (Seq.init 12 alternateIps) + let notContending = + let alternateUnderRadar i = + match i % 3 with + | 0 -> "C" + | 1 -> "D" + | _ -> "E" + Seq.replicate 12 runOk + |> Seq.zip (Seq.init 12 alternateUnderRadar) + let mkTags tags = + tags |> dict |> System.Collections.ObjectModel.ReadOnlyDictionary + let! time, results = + Seq.append notContending contending + // Catch inside so the first throw doesnt cancel the overall execution + |> Seq.map(fun (ip, op) -> ap.Execute(op, tags=mkTags ["clientIp",ip]) |> Async.Catch) + |> Async.Parallel + |> Stopwatch.Time + let oks, errs = Choice.partition results + test <@ 10+12 = Array.length oks + && time.Elapsed > s 2 && time.Elapsed < s 4 + && 2 = Array.length errs + && errs |> Array.forall (function :? Polly.Bulkhead.BulkheadRejectedException -> true | _ -> false) @> + let evnts = buffer.Take() + let queuedOrShed = function + | Call ("(default)",MaybeQueuing) as x -> Choice1Of3 x + | Deferred ("default","(default)","def",[tag],delay) as x -> Choice2Of3 (tag,delay) + | Shed ("default","(default)","def",[tag]) as x -> Choice3Of3 (tag,x) + | x -> failwithf "Unexpected event %A" x + let queued,waited,shed = evnts |> Seq.map queuedOrShed |> Choice.partition3 + let delayed = waited |> Array.filter (fun (_tag,x) -> x > ms 500) + test <@ 0 = Array.length queued // We're not triggering any queueing on the 'Limit' rule + && between 9 (24-10) (Array.length delayed) // Typically, 3+3+1+1+1 should get delayed, but depending on scheduling, some won't get logged as such, and we don't want a flickering test + && ['A'..'E'] |> Seq.forall (fun expected -> delayed |> Seq.exists (fun (tag,_delay) -> tag = ("clientIp",string expected))) + && 2 = Array.length shed @> } + type Cutoff(output : Xunit.Abstractions.ITestOutputHelper) = let log, buffer = LogHooks.createLoggerWithCapture output