Skip to content

Commit

Permalink
Actions checklist and leader's routine
Browse files Browse the repository at this point in the history
Here we are implementing the code responsible for building an actions
checklist that will be used to generate a proposal. We are also adding
an outline of the leader's routine.
  • Loading branch information
lukasz-zimnoch committed Nov 27, 2023
1 parent 9f7534b commit 558f5e6
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 14 deletions.
182 changes: 169 additions & 13 deletions pkg/tbtc/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"math/rand"
"sort"

"github.com/keep-network/keep-core/pkg/bitcoin"
"github.com/keep-network/keep-core/pkg/chain"
"github.com/keep-network/keep-core/pkg/generator"
"github.com/keep-network/keep-core/pkg/net"
"github.com/keep-network/keep-core/pkg/protocol/group"
"golang.org/x/sync/semaphore"
"math/rand"
"sort"
)

const (
Expand All @@ -39,6 +40,10 @@ const (
// hash can be used as an ingredient for the coordination seed, computed
// for the given coordination window.
coordinationSafeBlockShift = 32
// coordinationHeartbeatProbability is the probability of proposing a
// heartbeat action during the coordination procedure, assuming no other
// higher-priority action is proposed.
coordinationHeartbeatProbability = float64(0.125)
)

// errCoordinationExecutorBusy is an error returned when the coordination
Expand Down Expand Up @@ -81,6 +86,25 @@ func (cw *coordinationWindow) isAfter(other *coordinationWindow) bool {
return cw.coordinationBlock > other.coordinationBlock
}

// index returns the index of the coordination window. The index is computed
// by dividing the coordination block number by the coordination frequency.
// A valid index is a positive integer.
//
// For example:
// - window starting at block 900 has index 1
// - window starting at block 1800 has index 2
// - window starting at block 2700 has index 3
//
// If the coordination block number is not a multiple of the coordination
// frequency, the index is 0.
func (cw *coordinationWindow) index() uint64 {
if cw.coordinationBlock%coordinationFrequencyBlocks == 0 {
return cw.coordinationBlock / coordinationFrequencyBlocks
}

return 0
}

// watchCoordinationWindows watches for new coordination windows and runs
// the given callback when a new window is detected. The callback is run
// in a separate goroutine. It is guaranteed that the callback is not run
Expand All @@ -97,11 +121,11 @@ func watchCoordinationWindows(
for {
select {
case block := <-blocksChan:
if block%coordinationFrequencyBlocks == 0 {
if window := newCoordinationWindow(block); window.index() > 0 {
// Make sure the current window is not the same as the last one.
// There is no guarantee that the block channel will not emit
// the same block again.
if window := newCoordinationWindow(block); window.isAfter(lastWindow) {
if window.isAfter(lastWindow) {
lastWindow = window
// Run the callback in a separate goroutine to avoid blocking
// this loop and potentially missing the next block.
Expand Down Expand Up @@ -160,6 +184,17 @@ func (cf *coordinationFault) String() string {
)
}

// coordinationProposalGenerator is a function that generates a coordination
// proposal based on the given checklist of possible wallet actions.
// The checklist is a list of actions that should be checked for the given
// coordination window. The generator is expected to return a proposal
// for the first action from the checklist that is valid for the given
// wallet's state. If none of the actions are valid, the generator
// should return a noopProposal.
type coordinationProposalGenerator func(
actionsChecklist []WalletActionType,
) (coordinationProposal, error)

// coordinationProposal represents a single action proposal for the given wallet.
type coordinationProposal interface {
// actionType returns the specific type of the walletAction being subject
Expand Down Expand Up @@ -203,6 +238,16 @@ func (cr *coordinationResult) String() string {
)
}

// coordinationMessage represents a coordination message sent by the leader
// to their followers during the active phase of the coordination window.
type coordinationMessage struct {
// TODO: Add fields.
}

func (cm *coordinationMessage) Type() string {
return "tbtc/coordination_message"
}

// coordinationExecutor is responsible for executing the coordination
// procedure for the given wallet.
type coordinationExecutor struct {
Expand All @@ -214,9 +259,13 @@ type coordinationExecutor struct {
membersIndexes []group.MemberIndex
operatorAddress chain.Address

proposalGenerator coordinationProposalGenerator

broadcastChannel net.BroadcastChannel
membershipValidator *group.MembershipValidator
protocolLatch *generator.ProtocolLatch

waitForBlockFn waitForBlockFn
}

// newCoordinationExecutor creates a new coordination executor for the
Expand All @@ -226,19 +275,23 @@ func newCoordinationExecutor(
coordinatedWallet wallet,
membersIndexes []group.MemberIndex,
operatorAddress chain.Address,
proposalGenerator coordinationProposalGenerator,
broadcastChannel net.BroadcastChannel,
membershipValidator *group.MembershipValidator,
protocolLatch *generator.ProtocolLatch,
waitForBlockFn waitForBlockFn,
) *coordinationExecutor {
return &coordinationExecutor{
lock: semaphore.NewWeighted(1),
chain: chain,
coordinatedWallet: coordinatedWallet,
membersIndexes: membersIndexes,
operatorAddress: operatorAddress,
proposalGenerator: proposalGenerator,
broadcastChannel: broadcastChannel,
membershipValidator: membershipValidator,
protocolLatch: protocolLatch,
waitForBlockFn: waitForBlockFn,
}
}

Expand All @@ -263,26 +316,59 @@ func (ce *coordinationExecutor) coordinate(
ce.protocolLatch.Lock()
defer ce.protocolLatch.Unlock()

// Just in case, check if the window is valid.
if window.index() == 0 {
return nil, fmt.Errorf("invalid coordination window [%v]", window)
}

seed, err := ce.coordinationSeed(window)
if err != nil {
return nil, fmt.Errorf("failed to compute coordination seed: [%v]", err)
}

leader := ce.coordinationLeader(seed)

actionsChecklist := ce.actionsChecklist(window.index(), seed)

// Set up a context that is cancelled when the active phase of the
// coordination window ends.
ctx, cancelCtx := withCancelOnBlock(
context.Background(),
window.activePhaseEndBlock(),
ce.waitForBlockFn,
)
defer cancelCtx()

var proposal coordinationProposal
if leader == ce.operatorAddress {
ce.leaderRoutine()
proposal, err = ce.leaderRoutine(ctx, actionsChecklist)
if err != nil {
return nil, fmt.Errorf(
"failed to execute leader's routine: [%v]",
err,
)
}
} else {
ce.followerRoutine()
proposal, err = ce.followerRoutine()
if err != nil {
return nil, fmt.Errorf(
"failed to execute follower's routine: [%v]",
err,
)
}
}

// Just in case, if the proposal is nil, set it to noop.
if proposal == nil {
proposal = &noopProposal{}
}

// TODO: Implement the rest of the coordination procedure.
result := &coordinationResult{
wallet: ce.coordinatedWallet,
window: window,
leader: ce.coordinatedWallet.signingGroupOperators[0],
proposal: &noopProposal{},
faults: nil,
leader: leader,
proposal: proposal,
faults: nil, // TODO: Fill coordination faults.
}

return result, nil
Expand Down Expand Up @@ -350,10 +436,80 @@ func (ce *coordinationExecutor) coordinationLeader(seed [32]byte) chain.Address
return uniqueOperators[0]
}

func (ce *coordinationExecutor) leaderRoutine() {
// TODO: Implement the leader routine.
// actionsChecklist returns a list of wallet actions that should be checked
// for the given coordination window.
func (ce *coordinationExecutor) actionsChecklist(
windowIndex uint64,
seed [32]byte,
) []WalletActionType {
var actions []WalletActionType

// Redemption action is a priority action and should be checked on every
// coordination window.
actions = append(actions, ActionRedemption)

// Other actions should be checked with a lower frequency. The default
// frequency is every 16 coordination windows.
frequencyWindows := uint64(16)

// TODO: Increase frequency for the active wallet.
if windowIndex%frequencyWindows == 0 {
actions = append(actions, ActionDepositSweep)
}

if windowIndex%frequencyWindows == 0 {
actions = append(actions, ActionMovedFundsSweep)
}

// TODO: Increase frequency for old wallets.
if windowIndex%frequencyWindows == 0 {
actions = append(actions, ActionMovingFunds)
}

// #nosec G404 (insecure random number source (rand))
// Drawing a decision about heartbeat does not require secure randomness.
// Use first 8 bytes of the seed to initialize the RNG.
rng := rand.New(rand.NewSource(int64(binary.BigEndian.Uint64(seed[:8]))))
if rng.Float64() < coordinationHeartbeatProbability {
actions = append(actions, ActionHeartbeat)
}

return actions
}

// leaderRoutine executes the leader's routine for the given coordination
// window. The routine generates a proposal and broadcasts it to the followers.
// It returns the generated proposal or an error if the routine failed.
func (ce *coordinationExecutor) leaderRoutine(
ctx context.Context,
actionsChecklist []WalletActionType,
) (coordinationProposal, error) {
proposal, err := ce.proposalGenerator(actionsChecklist)
if err != nil {
return nil, fmt.Errorf("failed to generate proposal: [%v]", err)
}

message := &coordinationMessage{
// TODO: Initialize fields.
}

err = ce.broadcastChannel.Send(
ctx,
message,
net.BackoffRetransmissionStrategy,
)
if err != nil {
return nil, fmt.Errorf("failed to send coordination message: [%v]", err)
}

return proposal, nil
}

func (ce *coordinationExecutor) followerRoutine() {
// followerRoutine executes the follower's routine for the given coordination
// window. The routine listens for the coordination message from the leader and
// validates it. If the leader's proposal is valid, it returns the received
// proposal. Returns an error if the routine failed.
func (ce *coordinationExecutor) followerRoutine() (coordinationProposal, error) {
// TODO: Implement the follower routine.
return nil, nil
}
12 changes: 12 additions & 0 deletions pkg/tbtc/marshaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,18 @@ func (sdm *signingDoneMessage) Unmarshal(bytes []byte) error {
return nil
}

// Marshal converts the coordinationMessage to a byte array.
func (cm *coordinationMessage) Marshal() ([]byte, error) {
// TODO: Implement.
return nil, nil
}

// Unmarshal converts a byte array back to the coordinationMessage.
func (cm *coordinationMessage) Unmarshal(bytes []byte) error {
// TODO: Implement.
return nil
}

// marshalPublicKey converts an ECDSA public key to a byte
// array (uncompressed).
func marshalPublicKey(publicKey *ecdsa.PublicKey) ([]byte, error) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/tbtc/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ func (n *node) getCoordinationExecutor(
return nil, false, fmt.Errorf("failed to get broadcast channel: [%v]", err)
}

// TODO: Register unmarshalers
broadcastChannel.SetUnmarshaler(func() net.TaggedUnmarshaler {
return &coordinationMessage{}
})

membershipValidator := group.NewMembershipValidator(
executorLogger,
Expand Down Expand Up @@ -382,9 +384,11 @@ func (n *node) getCoordinationExecutor(
wallet,
membersIndexes,
operatorAddress,
nil, // TODO: Set a proper proposal generator.
broadcastChannel,
membershipValidator,
n.protocolLatch,
n.waitForBlockHeight,
)

n.coordinationExecutors[executorKey] = executor
Expand Down

0 comments on commit 558f5e6

Please sign in to comment.