Skip to content

Commit

Permalink
use records for event data
Browse files Browse the repository at this point in the history
  • Loading branch information
joemphilips committed Jan 2, 2022
1 parent 8e53590 commit 21af3be
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 141 deletions.
300 changes: 212 additions & 88 deletions NLoop.Domain/Swap.fs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions NLoop.Server/Actors/SwapActor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ module private Observable =
obs
|> Observable.choose(
function
| Choice1Of2{ Event = Swap.Event.FinishedByError(_id, err) } -> err |> Error |> Some
| Choice1Of2{ Event = Swap.Event.FinishedByError { Error = err } } -> err |> Error |> Some
| Choice2Of2{ Error = DomainError e } -> e.Msg |> Error |> Some
| Choice2Of2{ Error = Store(StoreError e) } -> e |> Error |> Some
| Choice1Of2{ Event = e } -> selector e |> Option.map(Ok)
Expand Down Expand Up @@ -230,7 +230,7 @@ type SwapActor(broadcaster: IBroadcaster,
let! maybeClaimTxId =
let chooser =
if loopOut.AcceptZeroConf then
(function | Swap.Event.ClaimTxPublished txId -> Some (Some txId) | _ -> None)
(function | Swap.Event.ClaimTxPublished { Txid = txId } -> Some (Some txId) | _ -> None)
else
(function | Swap.Event.NewLoopOutAdded _ -> Some (None) | _ -> None)
obs
Expand Down
16 changes: 0 additions & 16 deletions NLoop.Server/LoopHandlers.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@ open Microsoft.AspNetCore.Http
open FSharp.Control.Tasks
open Giraffe

[<RequireQualifiedAccess>]
module private Observable =

let inline awaitFirstErrorOrAsync
(selector: Swap.Event -> _ option)
(obs: IObservable<Choice<Swap.EventWithId, Swap.ErrorWithId>>) =
obs
|> Observable.choose(
function
| Choice1Of2{ Event = Swap.Event.FinishedByError(_id, err) } -> err |> Error |> Some
| Choice2Of2{ Error = e } -> e.ToString() |> Error |> Some
| Choice1Of2{ Event = e } -> selector e |> Option.map(Ok)
)
|> Observable.catchWith(fun ex -> Observable.Return(Error $"Error while handling observable {ex}"))
|> fun o -> o.FirstAsync().GetAwaiter() |> Async.AwaitCSharpAwaitable |> Async.StartAsTask


let handleLoopOutCore (req: LoopOutRequest) =
fun (next : HttpFunc) (ctx : HttpContext) ->
Expand Down
23 changes: 13 additions & 10 deletions NLoop.Server/ProcessManagers/SwapProcessManager.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace NLoop.Server.ProcessManagers

open System.Collections.Generic
open System.Threading.Tasks
open DotNetLightning.Utils.Primitives
open DotNetLightning.Payment
open FSharp.Control.Tasks
open FSharp.Control.Reactive
open LndClient
Expand All @@ -12,6 +12,7 @@ open NLoop.Domain.Utils
open NLoop.Server
open NLoop.Domain
open NLoop.Server.Actors
open FsToolkit.ErrorHandling

type SwapProcessManager(eventAggregator: IEventAggregator,
lightningClientProvider: ILightningClientProvider,
Expand All @@ -31,18 +32,20 @@ type SwapProcessManager(eventAggregator: IEventAggregator,
obs
|> Observable.choose(fun e ->
match e.Data with
| Swap.Event.OffChainOfferStarted(swapId, pairId, invoice, paymentParams) -> Some(swapId, pairId, invoice, paymentParams)
| Swap.Event.OffChainOfferStarted d ->
Some d
| _ -> None)
|> Observable.flatmapTask(fun (swapId, PairId(_, quoteAsset), invoice, paymentParams) ->
|> Observable.flatmapTask(fun ({ SwapId = swapId; PairId = pairId; Params = paymentParams } as data)->
task {
try
let invoice = data.Invoice |> ResultUtils.Result.deref
let! pr =
let req = {
SendPaymentRequest.Invoice = invoice
MaxFee = paymentParams.MaxFee
OutgoingChannelIds = paymentParams.OutgoingChannelIds
}
lightningClientProvider.GetClient(quoteAsset).Offer(req).ConfigureAwait(false)
lightningClientProvider.GetClient(pairId.Quote).Offer(req).ConfigureAwait(false)

match pr with
| Ok s ->
Expand All @@ -62,8 +65,8 @@ type SwapProcessManager(eventAggregator: IEventAggregator,
obs
|> Observable.subscribe(fun e ->
match e.Data with
| Swap.Event.NewLoopOutAdded(_, { Id = swapId })
| Swap.Event.NewLoopInAdded(_, { Id = swapId }) ->
| Swap.Event.NewLoopOutAdded { LoopOut = { Id = swapId } }
| Swap.Event.NewLoopInAdded { LoopIn = { Id = swapId } } ->
// TODO: re-registering everything from start is not very performant nor scalable.
// Ideally we should register only the one which is not finished.
logger.LogDebug($"Registering new Swap {swapId}")
Expand All @@ -73,10 +76,10 @@ type SwapProcessManager(eventAggregator: IEventAggregator,
with
| ex ->
logger.LogError $"Failed to register swap (id: {swapId}) {ex}"
| Swap.Event.FinishedSuccessfully swapId
| Swap.Event.FinishedByRefund swapId
| Swap.Event.FinishedByTimeout(swapId, _)
| Swap.Event.FinishedByError (swapId, _) ->
| Swap.Event.FinishedSuccessfully { Id = swapId }
| Swap.Event.FinishedByRefund { Id = swapId }
| Swap.Event.FinishedByTimeout { Id = swapId }
| Swap.Event.FinishedByError { Id = swapId } ->
logger.LogDebug($"Removing Finished Swap {swapId}")
try
for l in listeners do
Expand Down
5 changes: 3 additions & 2 deletions NLoop.Server/Projections/OnGoingSwapStateProjection.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type OnGoingSwapStateProjection(loggerFactory: ILoggerFactory,
let nextState = actor.Aggregate.Apply s r.Data
let startHeight =
match r.Data with
| Swap.Event.NewLoopOutAdded(h, _)
| Swap.Event.NewLoopInAdded(h, _) -> h | _ -> h
| Swap.Event.NewLoopOutAdded { Height = h }
| Swap.Event.NewLoopInAdded { Height = h } -> h
| _ -> h
(startHeight, nextState)
))
// We don't hold finished swaps on-memory for the scalability.
Expand Down
4 changes: 2 additions & 2 deletions NLoop.Server/Projections/RecentSwapFailureProjection.fs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type RecentSwapFailureProjection(opts: IOptions<NLoopOptions>,
| Error _ -> ()
| Ok r ->
match r.Data with
| Swap.Event.NewLoopOutAdded(_h, o) ->
| Swap.Event.NewLoopOutAdded { LoopOut = o } ->
this.FailedLoopOutSwapState <-
this.FailedLoopOutSwapState
|> Map.add re.StreamId (o.OutgoingChanIds, ValueNone)
Expand All @@ -60,7 +60,7 @@ type RecentSwapFailureProjection(opts: IOptions<NLoopOptions>,
this.FailedLoopOutSwapState <-
this.FailedLoopOutSwapState
|> dropOldest
| Swap.Event.NewLoopInAdded(_h, i) ->
| Swap.Event.NewLoopInAdded { LoopIn = i } ->
i.LastHop
|> Option.iter(fun lastHop ->
this.FailedLoopInSwapState <-
Expand Down
55 changes: 34 additions & 21 deletions tests/NLoop.Domain.Tests/SwapTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ type SwapDomainTests() =
|> Result.map handler.Reconstitute

let assertNotUnknownEvent (e: ESEvent<_>) =
e.Data |> function | Swap.Event.UnknownTagEvent(t, _) -> failwith $"unknown tag {t}" | _ -> e
e.Data |> function | Swap.Event.UnknownTagEvent { Tag = t } -> failwith $"unknown tag {t}" | _ -> e
let getLastEvent e =
e
|> Result.deref
Expand All @@ -312,9 +312,9 @@ type SwapDomainTests() =
[<Fact>]
member this.JsonSerializerTest() =
let events = [
Swap.Event.FinishedByError(SwapId("foo"), "Error msg")
Swap.Event.ClaimTxPublished(uint256.Zero)
Swap.Event.TheirSwapTxPublished(Network.RegTest.CreateTransaction().ToHex())
Swap.Event.FinishedByError { Id = SwapId("foo"); Error = "Error msg" }
Swap.Event.ClaimTxPublished { Txid = uint256.Zero }
Swap.Event.TheirSwapTxPublished { TxHex = Network.RegTest.CreateTransaction().ToHex() }
]

for e in events do
Expand All @@ -324,14 +324,14 @@ type SwapDomainTests() =

[<Property(MaxTest=10)>]
member this.JsonSerializerTest_LoopIn(loopIn: LoopIn, height: uint32) =
let e = Swap.Event.NewLoopInAdded(height |> BlockHeight, loopIn)
let e = Swap.Event.NewLoopInAdded { Height = height |> BlockHeight; LoopIn = loopIn }
let ser = Swap.serializer
let e2 = ser.EventToBytes(e) |> ser.BytesToEvents
Assertion.isOk(e2)

[<Property(MaxTest=10)>]
member this.JsonSerializerTest_LoopOut(loopOut: LoopOut, height: uint32) =
let e = Swap.Event.NewLoopOutAdded(height |> BlockHeight,loopOut)
let e = Swap.Event.NewLoopOutAdded { Height = height |> BlockHeight; LoopOut = loopOut }
let ser = Swap.serializer
let e2 = ser.EventToBytes(e) |> ser.BytesToEvents
Assertion.isOk(e2)
Expand All @@ -350,6 +350,18 @@ type SwapDomainTests() =
let se2 = ser |> Result.deref
Assert.Equal(se, se2)

[<Property(MaxTest=200)>]
member this.SwapEventSerialize(e: Swap.Event) =
match e with
| Swap.Event.UnknownTagEvent { Tag = t } when Swap.Event.KnownTags |> Array.contains t ->
()
| _ ->
let ser = Swap.serializer
let b = ser.EventToBytes e |> ser.BytesToEvents
Assertion.isOk(b)
let e2 = b |> Result.deref
Assert.Equal(e, e2)

[<Property(MaxTest=10)>]
member this.TestNewLoopOut(loopOut: LoopOut, loopOutParams: Swap.LoopOutParams) =
let loopOut = {
Expand Down Expand Up @@ -442,15 +454,15 @@ type SwapDomainTests() =
]

|> commandsToEvents
Assert.Contains(Swap.Event.TheirSwapTxPublished(swapTx.ToHex()), events |> Result.deref |> List.map(fun e -> e.Data))
Assert.Contains(Swap.Event.TheirSwapTxPublished { TxHex = swapTx.ToHex() }, events |> Result.deref |> List.map(fun e -> e.Data))

let lastEvent =
events |> getLastEvent
let expected =
if loopOut.AcceptZeroConf then
Swap.Event.ClaimTxPublished(null).Type
Swap.Event.ClaimTxPublished({ Txid = null }).Type
else
Swap.Event.TheirSwapTxPublished(null).Type
Swap.Event.TheirSwapTxPublished({ TxHex = null }).Type
Assert.Equal(expected, lastEvent.Data.Type)

let genesis =
Expand All @@ -465,11 +477,11 @@ type SwapDomainTests() =
(Swap.Command.NewBlock(b1, loopOut.PairId.Base))
]
|> commandsToEvents
Assert.Contains(Swap.Event.TheirSwapTxConfirmedFirstTime({| BlockHash = b1.Block.Header.GetHash(); Height = b1.Height |}),
Assert.Contains(Swap.Event.TheirSwapTxConfirmedFirstTime({ BlockHash = b1.Block.Header.GetHash(); Height = b1.Height }),
events |> Result.deref |> List.map(fun e -> e.Data))
if loopOut.AcceptZeroConf then
let lastEvent = events |> getLastEvent
let expected = Swap.Event.ClaimTxPublished(null).Type
let expected = Swap.Event.ClaimTxPublished({ Txid = null }).Type
Assert.Equal(expected, lastEvent.Data.Type)

let b2 =
Expand All @@ -487,7 +499,7 @@ type SwapDomainTests() =
|> Result.deref

let expected =
Swap.Event.ClaimTxPublished(null).Type
Swap.Event.ClaimTxPublished({ Txid = null }).Type
Assert.Contains(expected, events |> List.map(fun e -> e.Data.Type))

let claimTx =
Expand All @@ -505,7 +517,7 @@ type SwapDomainTests() =
let sweepAmount = loopOut.OnChainAmount - serverFee.ToMoney()
let expected =
let sweepTxId = txBroadcasted |> Seq.last |> fun t -> t.GetHash()
Swap.Event.ClaimTxConfirmed(b4.Block.Header.GetHash(), sweepTxId, sweepAmount)
Swap.Event.ClaimTxConfirmed { BlockHash = b4.Block.Header.GetHash(); TxId = sweepTxId; SweepAmount = sweepAmount }
Assert.Equal(expected.Type, lastEvent.Data.Type)
Assert.True(Money.Zero < sweepAmount && sweepAmount < loopOut.OnChainAmount)
let lastEvent =
Expand All @@ -518,7 +530,7 @@ type SwapDomainTests() =
]
|> commandsToEvents
|> getLastEvent
Assert.Equal(Swap.Event.FinishedSuccessfully(loopOut.Id), lastEvent.Data)
Assert.Equal(Swap.Event.FinishedSuccessfully { Id = loopOut.Id }, lastEvent.Data)

static member TestLoopOut_Reorg_TestData =
let loopOut = testLoopOut1
Expand Down Expand Up @@ -669,7 +681,7 @@ type SwapDomainTests() =
Assertion.isOk events
let swapTx = Assert.Single(txBroadcasted)
let lastEvent = events |> getLastEvent
Assert.Equal(Swap.Event.OurSwapTxPublished(Money.Zero, swapTx.ToHex(), 0u).Type, lastEvent.Data.Type)
Assert.Equal(Swap.Event.OurSwapTxPublished({ Fee = Money.Zero; TxHex = swapTx.ToHex(); HtlcOutIndex = 0u }).Type, lastEvent.Data.Type)

// act
let b1 =
Expand All @@ -687,14 +699,14 @@ type SwapDomainTests() =
|> commandsToEvents
// assert
Assert.Contains(e2 |> Result.deref,
fun e -> e.Data.Type = Swap.Event.OurSwapTxConfirmed(b1.Block.Header.GetHash(), uint256.Zero, 0u).Type)
fun e -> e.Data.Type = (Swap.Event.OurSwapTxConfirmed { BlockHash = b1.Block.Header.GetHash(); TxId = uint256.Zero; HTlcOutIndex = 0u }).Type)
let lastEvent = e2 |> getLastEvent
Assert.Equal(Swap.Event.RefundTxPublished(uint256.Zero).Type, lastEvent.Data.Type)
Assert.Equal(Swap.Event.RefundTxPublished({ TxId = uint256.Zero }).Type, lastEvent.Data.Type)

let b3 =
let b = b2.CreateNext(pubkey3.WitHash.GetAddress(loopIn.QuoteAssetNetwork))
let refundTx =
let refundTxId = lastEvent.Data |> function | Swap.Event.RefundTxPublished refundTxId -> refundTxId | _ -> failwith "unreachable"
let refundTxId = lastEvent.Data |> function | Swap.Event.RefundTxPublished { TxId = id } -> id | _ -> failwith "unreachable"
txBroadcasted |> Seq.find(fun tx -> tx.GetHash() = refundTxId)
b.Block.AddTransaction(refundTx) |> ignore
b
Expand All @@ -707,7 +719,7 @@ type SwapDomainTests() =
|> commandsToEvents
|> getLastEvent
// assert
Assert.Equal(Swap.Event.FinishedByRefund(loopIn.Id).Type, lastEvent.Data.Type)
Assert.Equal(Swap.Event.FinishedByRefund({ Id = loopIn.Id }).Type, lastEvent.Data.Type)

[<Property(MaxTest=10)>]
member this.TestLoopIn_Success(loopIn: LoopIn, testAltcoin: bool) =
Expand Down Expand Up @@ -772,7 +784,8 @@ type SwapDomainTests() =
Assertion.isOk events
let swapTx = Assert.Single(txBroadcasted) // swap tx and refund tx
let lastEvent = events |> getLastEvent
Assert.Equal(Swap.Event.OurSwapTxPublished(Money.Zero, swapTx.ToHex(), 0u).Type, lastEvent.Data.Type)
Assert.Equal(Swap.Event.OurSwapTxPublished({ Fee = Money.Zero; TxHex = swapTx.ToHex(); HtlcOutIndex = 0u }).Type,
lastEvent.Data.Type)

let b0 =
let b = loopIn.GetGenesis()
Expand Down Expand Up @@ -804,5 +817,5 @@ type SwapDomainTests() =
|> getLastEvent

// assert
Assert.Equal(Swap.Event.FinishedSuccessfully(loopIn.Id).Type, lastEvent.Data.Type)
Assert.Equal(Swap.Event.FinishedSuccessfully( { Id = loopIn.Id }).Type, lastEvent.Data.Type)

1 change: 1 addition & 0 deletions tests/NLoop.Server.Tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

tests with "Docker" trait must be run after launching docker-compose
manually, run

```sh
docker-compose up
./cliutils/prepare_tx_for_fee.sh
Expand Down

0 comments on commit 21af3be

Please sign in to comment.