Skip to content

Commit

Permalink
Tidying/commenting
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 5, 2019
1 parent 4a3de5a commit d962230
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
namespace Propulsion.Cosmos

open Microsoft.Azure.Documents
open Propulsion.Streams

/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by Propulsion.Streams
/// <remarks>
/// NOTE until `tip-isa-batch` gets merged, this causes a null-traversal of `-1`-index pages that presently do not contain data.
/// This is intentional in the name of forward compatibility for projectors - enabling us to upgrade the data format without necessitating
/// updates of all projectors (even if there can potentially be significant at-least-once-ness to the delivery).</remarks>
[<RequireQualifiedAccess>]
module DocumentParser =
module EquinoxCosmosParser =
type Document with
member document.Cast<'T>() =
let tmp = new Document()
tmp.SetPropertyValue("content", document)
tmp.GetPropertyValue<'T>("content")
/// Sanity check to determine whether the Document represents an `Equinox.Cosmos` >= 1.0 based batch
let isEquinoxBatch (d : Document) =
d.GetPropertyValue "p" <> null && d.GetPropertyValue "i" <> null
&& d.GetPropertyValue "n" <> null && d.GetPropertyValue "e" <> null
/// Maps fields in an Event within an Equinox.Cosmos V1+ Event (in a Batch or Tip) to the interface defined by the default Codec
let (|CodecEvent|) (x: Equinox.Cosmos.Store.Event) =
{ new Propulsion.Streams.IEvent<_> with

let (|PropulsionEvent|) (x: Equinox.Cosmos.Store.Event) =
{ new IEvent<_> with
member __.EventType = x.c
member __.Data = x.d
member __.Meta = x.m
member __.Timestamp = x.t }

/// Sanity check to determine whether the Document represents an `Equinox.Cosmos` >= 1.0 based batch
let isEquinoxBatch (d : Document) =
d.GetPropertyValue "p" <> null && d.GetPropertyValue "i" <> null
&& d.GetPropertyValue "n" <> null && d.GetPropertyValue "e" <> null

/// Enumerates the events represented within a batch
// NOTE until `tip-isa-batch` gets merged, this causes a null-traversal of `-1`-index pages which presently do not contain data
// This is intentional in the name of forward compatibility for projectors - enabling us to upgrade the data format without necessitating
// updates of all projectors (even if there can potentially be significant at-least-once-ness to the delivery)
let enumEquinoxCosmosEvents (batch : Equinox.Cosmos.Store.Batch) =
batch.e |> Seq.mapi (fun offset (CodecEvent x) -> { stream = batch.p; index = batch.i + int64 offset; event = x } : Propulsion.Streams.StreamEvent<_>)
let enumEquinoxCosmosEvents (batch : Equinox.Cosmos.Store.Batch) : StreamEvent<_> seq =
batch.e |> Seq.mapi (fun offset (PropulsionEvent x) -> { stream = batch.p; index = batch.i + int64 offset; event = x })

/// Collects all events with a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let enumEvents (d : Document) : Propulsion.Streams.StreamEvent<_> seq =
let enumStreamEvents (d : Document) : StreamEvent<_> seq =
if isEquinoxBatch d then d.Cast<Equinox.Cosmos.Store.Batch>() |> enumEquinoxCosmosEvents
else Seq.empty
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="DocumentParser.fs" />
<Compile Include="EquinoxCosmosParser.fs" />
<Compile Include="Infrastructure.fs" />
<Compile Include="ChangeFeedProcessor.fs" />
<Compile Include="CosmosSource.fs" />
Expand Down
17 changes: 12 additions & 5 deletions src/Propulsion.Kafka/Codec.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ namespace Propulsion.Kafka.Codec

open Newtonsoft.Json
open Newtonsoft.Json.Linq
open Propulsion.Streams

/// Manages injecting prepared json into the data being submitted to DocDb as-is, on the basis we can trust it to be valid json as DocDb will need it to be
// NB this code is cloned from the Equinox repo and should remain in sync with that - there are tests pinning various behaviors to go with it there
type VerbatimUtf8JsonConverter() =
inherit JsonConverter()

Expand Down Expand Up @@ -38,6 +40,7 @@ type [<NoEquality; NoComparison>] RenderedEvent =
[<JsonConverter(typeof<VerbatimUtf8JsonConverter>)>]
[<JsonProperty(Required=Required.Default, NullValueHandling=NullValueHandling.Ignore)>]
m: byte[] }

interface Propulsion.Streams.IEvent<byte[]> with
member __.EventType = __.c
member __.Data = __.d
Expand All @@ -55,12 +58,16 @@ type [<NoEquality; NoComparison>] RenderedSpan =
/// The Events comprising this span
e: RenderedEvent[] }

/// Helpers for mapping to/from `Equinox.Codec` types
/// Helpers for mapping to/from `Propulsion.Streams` canonical event types
module RenderedSpan =
let ofStreamSpan (stream : string) (span : Propulsion.Streams.StreamSpan<_>) : RenderedSpan =

let ofStreamSpan (stream : string) (span : StreamSpan<_>) : RenderedSpan =
{ s = stream
i = span.index
e = [| for x in span.events -> { c = x.EventType; t = x.Timestamp; d = x.Data; m = x.Meta } |] }
let enumEvents (span : RenderedSpan) : seq<Propulsion.Streams.IEvent<byte[]>> = Seq.cast span.e
let enumStreamEvents (span: RenderedSpan) : seq<Propulsion.Streams.StreamEvent<_>> =
e = span.events |> Array.map (fun x -> { c = x.EventType; t = x.Timestamp; d = x.Data; m = x.Meta }) }

let enumEvents (span : RenderedSpan) : IEvent<byte[]> seq =
Seq.cast span.e

let enumStreamEvents (span: RenderedSpan) : StreamEvent<_> seq =
enumEvents span |> Seq.mapi (fun i e -> { stream = span.s; index = span.i + int64 i; event = e })
1 change: 1 addition & 0 deletions src/Propulsion/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Option =
[<AutoOpen>]
module private AsyncHelpers =
type Async with
static member Sleep(t : TimeSpan) : Async<unit> = Async.Sleep(int t.TotalMilliseconds)
static member AwaitTaskCorrect (task : Task<'T>) : Async<'T> =
Async.FromContinuations <| fun (k,ek,_) ->
task.ContinueWith (fun (t:Task<'T>) ->
Expand Down
16 changes: 7 additions & 9 deletions src/Propulsion/Ingestion.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ open System.Threading
/// - Each write attempt is always of the newest token (each update is assumed to also count for all preceding ones)
/// - retries until success or a new item is posted
type ProgressWriter<'Res when 'Res: equality>(?period,?sleep) =
let sleepSpan = defaultArg sleep (TimeSpan.FromMilliseconds 100.)
let writeInterval,sleepPeriod = defaultArg period (TimeSpan.FromSeconds 5.), int sleepSpan.TotalMilliseconds
let writeInterval,sleepPeriod = defaultArg period (TimeSpan.FromSeconds 5.), defaultArg sleep (TimeSpan.FromMilliseconds 100.)
let due = intervalCheck writeInterval
let mutable committedEpoch = None
let mutable validatedPos = None
Expand Down Expand Up @@ -41,15 +40,15 @@ type private InternalMessage =
| Added of streams: int * events: int

type private Stats(log : ILogger, statsInterval : TimeSpan) =
let mutable validatedEpoch, comittedEpoch : int64 option * int64 option = None, None
let mutable validatedEpoch, committedEpoch : int64 option * int64 option = None, None
let progCommitFails, progCommits = ref 0, ref 0
let cycles, batchesPended, streamsPended, eventsPended = ref 0, ref 0, ref 0, ref 0
let statsDue = intervalCheck statsInterval
let dumpStats (activeReads, maxReads) =
log.Information("Buffering Cycles {cycles} Ingested {batches} ({streams:n0}s {events:n0}e)", !cycles, !batchesPended, !streamsPended, !eventsPended)
cycles := 0; batchesPended := 0; streamsPended := 0; eventsPended := 0
if !progCommitFails <> 0 || !progCommits <> 0 then
match comittedEpoch with
match committedEpoch with
| None ->
log.Error("Uncommitted {activeReads}/{maxReads} @ {validated}; writing failing: {failures} failures ({commits} successful commits)",
activeReads, maxReads, Option.toNullable validatedEpoch, !progCommitFails, !progCommits)
Expand All @@ -62,13 +61,13 @@ type private Stats(log : ILogger, statsInterval : TimeSpan) =
progCommits := 0; progCommitFails := 0
else
log.Information("Uncommitted {activeReads}/{maxReads} @ {validated} (committed: {committed})",
activeReads, maxReads, Option.toNullable validatedEpoch, Option.toNullable comittedEpoch)
activeReads, maxReads, Option.toNullable validatedEpoch, Option.toNullable committedEpoch)
member __.Handle : InternalMessage -> unit = function
| Validated epoch ->
validatedEpoch <- Some epoch
| ProgressResult (Choice1Of2 epoch) ->
incr progCommits
comittedEpoch <- Some epoch
committedEpoch <- Some epoch
| ProgressResult (Choice2Of2 (_exn : exn)) ->
incr progCommitFails
| Added (streams,events) ->
Expand All @@ -88,7 +87,6 @@ type Ingester<'Items,'Batch> private
makeBatch : (unit->unit) -> 'Items -> ('Batch * (int * int)),
submit : 'Batch -> unit,
cts : CancellationTokenSource) =
let sleepInterval = int sleepInterval.TotalMilliseconds
let maxRead = Sem maxRead
let incoming = new ConcurrentQueue<_>()
let messages = new ConcurrentQueue<InternalMessage>()
Expand Down Expand Up @@ -128,7 +126,7 @@ type Ingester<'Items,'Batch> private
if not worked then do! Async.Sleep sleepInterval
with e -> log.Error(e, "Ingester exception") }

/// Starts an independent Task which handles
/// Starts an independent Task that handles
/// a) `unpack`ing of `incoming` items
/// b) `submit`ting them onward (assuming there is capacity within the `readLimit`)
static member Start<'Item>(log, maxRead, makeBatch, submit, ?statsInterval, ?sleepInterval) =
Expand All @@ -149,4 +147,4 @@ type Ingester<'Items,'Batch> private
return maxRead.State }

/// As range assignments get revoked, a user is expected to `Stop `the active processing thread for the Ingester before releasing references to it
member __.Stop() = cts.Cancel()
member __.Stop() = cts.Cancel()

0 comments on commit d962230

Please sign in to comment.