diff --git a/pkg/tbtc/coordination.go b/pkg/tbtc/coordination.go index 90b5d97ff8..45b0cc6b07 100644 --- a/pkg/tbtc/coordination.go +++ b/pkg/tbtc/coordination.go @@ -5,14 +5,15 @@ import ( "crypto/sha256" "encoding/binary" "fmt" + "math/rand" + "sort" + "github.com/keep-network/keep-core/pkg/bitcoin" "github.com/keep-network/keep-core/pkg/chain" "github.com/keep-network/keep-core/pkg/generator" "github.com/keep-network/keep-core/pkg/net" "github.com/keep-network/keep-core/pkg/protocol/group" "golang.org/x/sync/semaphore" - "math/rand" - "sort" ) const ( @@ -39,6 +40,10 @@ const ( // hash can be used as an ingredient for the coordination seed, computed // for the given coordination window. coordinationSafeBlockShift = 32 + // coordinationHeartbeatProbability is the probability of proposing a + // heartbeat action during the coordination procedure, assuming no other + // higher-priority action is proposed. + coordinationHeartbeatProbability = float64(0.125) ) // errCoordinationExecutorBusy is an error returned when the coordination @@ -81,6 +86,25 @@ func (cw *coordinationWindow) isAfter(other *coordinationWindow) bool { return cw.coordinationBlock > other.coordinationBlock } +// index returns the index of the coordination window. The index is computed +// by dividing the coordination block number by the coordination frequency. +// A valid index is a positive integer. +// +// For example: +// - window starting at block 900 has index 1 +// - window starting at block 1800 has index 2 +// - window starting at block 2700 has index 3 +// +// If the coordination block number is not a multiple of the coordination +// frequency, the index is 0. +func (cw *coordinationWindow) index() uint64 { + if cw.coordinationBlock%coordinationFrequencyBlocks == 0 { + return cw.coordinationBlock / coordinationFrequencyBlocks + } + + return 0 +} + // watchCoordinationWindows watches for new coordination windows and runs // the given callback when a new window is detected. The callback is run // in a separate goroutine. It is guaranteed that the callback is not run @@ -97,11 +121,11 @@ func watchCoordinationWindows( for { select { case block := <-blocksChan: - if block%coordinationFrequencyBlocks == 0 { + if window := newCoordinationWindow(block); window.index() > 0 { // Make sure the current window is not the same as the last one. // There is no guarantee that the block channel will not emit // the same block again. - if window := newCoordinationWindow(block); window.isAfter(lastWindow) { + if window.isAfter(lastWindow) { lastWindow = window // Run the callback in a separate goroutine to avoid blocking // this loop and potentially missing the next block. @@ -160,6 +184,17 @@ func (cf *coordinationFault) String() string { ) } +// coordinationProposalGenerator is a function that generates a coordination +// proposal based on the given checklist of possible wallet actions. +// The checklist is a list of actions that should be checked for the given +// coordination window. The generator is expected to return a proposal +// for the first action from the checklist that is valid for the given +// wallet's state. If none of the actions are valid, the generator +// should return a noopProposal. +type coordinationProposalGenerator func( + actionsChecklist []WalletActionType, +) (coordinationProposal, error) + // coordinationProposal represents a single action proposal for the given wallet. type coordinationProposal interface { // actionType returns the specific type of the walletAction being subject @@ -203,6 +238,16 @@ func (cr *coordinationResult) String() string { ) } +// coordinationMessage represents a coordination message sent by the leader +// to their followers during the active phase of the coordination window. +type coordinationMessage struct { + // TODO: Add fields. +} + +func (cm *coordinationMessage) Type() string { + return "tbtc/coordination_message" +} + // coordinationExecutor is responsible for executing the coordination // procedure for the given wallet. type coordinationExecutor struct { @@ -214,9 +259,13 @@ type coordinationExecutor struct { membersIndexes []group.MemberIndex operatorAddress chain.Address + proposalGenerator coordinationProposalGenerator + broadcastChannel net.BroadcastChannel membershipValidator *group.MembershipValidator protocolLatch *generator.ProtocolLatch + + waitForBlockFn waitForBlockFn } // newCoordinationExecutor creates a new coordination executor for the @@ -226,9 +275,11 @@ func newCoordinationExecutor( coordinatedWallet wallet, membersIndexes []group.MemberIndex, operatorAddress chain.Address, + proposalGenerator coordinationProposalGenerator, broadcastChannel net.BroadcastChannel, membershipValidator *group.MembershipValidator, protocolLatch *generator.ProtocolLatch, + waitForBlockFn waitForBlockFn, ) *coordinationExecutor { return &coordinationExecutor{ lock: semaphore.NewWeighted(1), @@ -236,9 +287,11 @@ func newCoordinationExecutor( coordinatedWallet: coordinatedWallet, membersIndexes: membersIndexes, operatorAddress: operatorAddress, + proposalGenerator: proposalGenerator, broadcastChannel: broadcastChannel, membershipValidator: membershipValidator, protocolLatch: protocolLatch, + waitForBlockFn: waitForBlockFn, } } @@ -263,6 +316,11 @@ func (ce *coordinationExecutor) coordinate( ce.protocolLatch.Lock() defer ce.protocolLatch.Unlock() + // Just in case, check if the window is valid. + if window.index() == 0 { + return nil, fmt.Errorf("invalid coordination window [%v]", window) + } + seed, err := ce.coordinationSeed(window) if err != nil { return nil, fmt.Errorf("failed to compute coordination seed: [%v]", err) @@ -270,19 +328,47 @@ func (ce *coordinationExecutor) coordinate( leader := ce.coordinationLeader(seed) + actionsChecklist := ce.actionsChecklist(window.index(), seed) + + // Set up a context that is cancelled when the active phase of the + // coordination window ends. + ctx, cancelCtx := withCancelOnBlock( + context.Background(), + window.activePhaseEndBlock(), + ce.waitForBlockFn, + ) + defer cancelCtx() + + var proposal coordinationProposal if leader == ce.operatorAddress { - ce.leaderRoutine() + proposal, err = ce.leaderRoutine(ctx, actionsChecklist) + if err != nil { + return nil, fmt.Errorf( + "failed to execute leader's routine: [%v]", + err, + ) + } } else { - ce.followerRoutine() + proposal, err = ce.followerRoutine() + if err != nil { + return nil, fmt.Errorf( + "failed to execute follower's routine: [%v]", + err, + ) + } + } + + // Just in case, if the proposal is nil, set it to noop. + if proposal == nil { + proposal = &noopProposal{} } - // TODO: Implement the rest of the coordination procedure. result := &coordinationResult{ wallet: ce.coordinatedWallet, window: window, - leader: ce.coordinatedWallet.signingGroupOperators[0], - proposal: &noopProposal{}, - faults: nil, + leader: leader, + proposal: proposal, + faults: nil, // TODO: Fill coordination faults. } return result, nil @@ -350,10 +436,80 @@ func (ce *coordinationExecutor) coordinationLeader(seed [32]byte) chain.Address return uniqueOperators[0] } -func (ce *coordinationExecutor) leaderRoutine() { - // TODO: Implement the leader routine. +// actionsChecklist returns a list of wallet actions that should be checked +// for the given coordination window. +func (ce *coordinationExecutor) actionsChecklist( + windowIndex uint64, + seed [32]byte, +) []WalletActionType { + var actions []WalletActionType + + // Redemption action is a priority action and should be checked on every + // coordination window. + actions = append(actions, ActionRedemption) + + // Other actions should be checked with a lower frequency. The default + // frequency is every 16 coordination windows. + frequencyWindows := uint64(16) + + // TODO: Increase frequency for the active wallet. + if windowIndex%frequencyWindows == 0 { + actions = append(actions, ActionDepositSweep) + } + + if windowIndex%frequencyWindows == 0 { + actions = append(actions, ActionMovedFundsSweep) + } + + // TODO: Increase frequency for old wallets. + if windowIndex%frequencyWindows == 0 { + actions = append(actions, ActionMovingFunds) + } + + // #nosec G404 (insecure random number source (rand)) + // Drawing a decision about heartbeat does not require secure randomness. + // Use first 8 bytes of the seed to initialize the RNG. + rng := rand.New(rand.NewSource(int64(binary.BigEndian.Uint64(seed[:8])))) + if rng.Float64() < coordinationHeartbeatProbability { + actions = append(actions, ActionHeartbeat) + } + + return actions +} + +// leaderRoutine executes the leader's routine for the given coordination +// window. The routine generates a proposal and broadcasts it to the followers. +// It returns the generated proposal or an error if the routine failed. +func (ce *coordinationExecutor) leaderRoutine( + ctx context.Context, + actionsChecklist []WalletActionType, +) (coordinationProposal, error) { + proposal, err := ce.proposalGenerator(actionsChecklist) + if err != nil { + return nil, fmt.Errorf("failed to generate proposal: [%v]", err) + } + + message := &coordinationMessage{ + // TODO: Initialize fields. + } + + err = ce.broadcastChannel.Send( + ctx, + message, + net.BackoffRetransmissionStrategy, + ) + if err != nil { + return nil, fmt.Errorf("failed to send coordination message: [%v]", err) + } + + return proposal, nil } -func (ce *coordinationExecutor) followerRoutine() { +// followerRoutine executes the follower's routine for the given coordination +// window. The routine listens for the coordination message from the leader and +// validates it. If the leader's proposal is valid, it returns the received +// proposal. Returns an error if the routine failed. +func (ce *coordinationExecutor) followerRoutine() (coordinationProposal, error) { // TODO: Implement the follower routine. + return nil, nil } diff --git a/pkg/tbtc/marshaling.go b/pkg/tbtc/marshaling.go index 93eeb72dcb..1c992e539a 100644 --- a/pkg/tbtc/marshaling.go +++ b/pkg/tbtc/marshaling.go @@ -125,6 +125,18 @@ func (sdm *signingDoneMessage) Unmarshal(bytes []byte) error { return nil } +// Marshal converts the coordinationMessage to a byte array. +func (cm *coordinationMessage) Marshal() ([]byte, error) { + // TODO: Implement. + return nil, nil +} + +// Unmarshal converts a byte array back to the coordinationMessage. +func (cm *coordinationMessage) Unmarshal(bytes []byte) error { + // TODO: Implement. + return nil +} + // marshalPublicKey converts an ECDSA public key to a byte // array (uncompressed). func marshalPublicKey(publicKey *ecdsa.PublicKey) ([]byte, error) { diff --git a/pkg/tbtc/node.go b/pkg/tbtc/node.go index 54b79c0a9e..746dddccce 100644 --- a/pkg/tbtc/node.go +++ b/pkg/tbtc/node.go @@ -348,7 +348,9 @@ func (n *node) getCoordinationExecutor( return nil, false, fmt.Errorf("failed to get broadcast channel: [%v]", err) } - // TODO: Register unmarshalers + broadcastChannel.SetUnmarshaler(func() net.TaggedUnmarshaler { + return &coordinationMessage{} + }) membershipValidator := group.NewMembershipValidator( executorLogger, @@ -382,9 +384,11 @@ func (n *node) getCoordinationExecutor( wallet, membersIndexes, operatorAddress, + nil, // TODO: Set a proper proposal generator. broadcastChannel, membershipValidator, n.protocolLatch, + n.waitForBlockHeight, ) n.coordinationExecutors[executorKey] = executor