Skip to content

Commit

Permalink
wait for forfeits
Browse files Browse the repository at this point in the history
  • Loading branch information
sekulicd committed Feb 6, 2025
1 parent a9b2b18 commit 8a14c54
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
23 changes: 17 additions & 6 deletions server/internal/core/application/covenantless.go
Original file line number Diff line number Diff line change
Expand Up @@ -1115,14 +1115,15 @@ func (s *covenantlessService) startRound() {
s.currentRound = round

defer func() {
time.Sleep(time.Duration(s.roundInterval/3) * time.Second)
s.startFinalization()
roundEndTime := time.Now().Add(time.Duration(s.roundInterval) * time.Second)
time.Sleep(time.Duration(s.roundInterval/6) * time.Second)
s.startFinalization(roundEndTime)
}()

log.Debugf("started registration stage for new round: %s", round.Id)
}

func (s *covenantlessService) startFinalization() {
func (s *covenantlessService) startFinalization(roundEndTime time.Time) {
log.Debugf("started finalization stage for round: %s", s.currentRound.Id)
ctx := context.Background()
round := s.currentRound
Expand All @@ -1147,8 +1148,8 @@ func (s *covenantlessService) startFinalization() {
s.startRound()
return
}
time.Sleep(thirdOfRemainingDuration)
s.finalizeRound(notes)

s.finalizeRound(notes, roundEndTime)
}()

if round.IsFailed() {
Expand Down Expand Up @@ -1377,7 +1378,7 @@ func (s *covenantlessService) propagateRoundSigningNoncesGeneratedEvent(combined
s.eventsCh <- ev
}

func (s *covenantlessService) finalizeRound(notes []note.Note) {
func (s *covenantlessService) finalizeRound(notes []note.Note, roundEndTime time.Time) {
defer s.startRound()

ctx := context.Background()
Expand All @@ -1394,6 +1395,16 @@ func (s *covenantlessService) finalizeRound(notes []note.Note) {
}
}()

remainingTime := time.Until(roundEndTime)
// Wait for the remaining forfeit txs to be sent,
// but only wait until the round interval expires.
select {
case <-s.forfeitTxs.doneCh:
log.Debug("all forfeit txs have been sent")
case <-time.After(remainingTime):
log.Debug("timeout waiting for forfeit txs")
}

forfeitTxs, err := s.forfeitTxs.pop()
if err != nil {
changes = round.Fail(fmt.Errorf("failed to finalize round: %s", err))
Expand Down
31 changes: 30 additions & 1 deletion server/internal/core/application/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,20 @@ type forfeitTxsMap struct {
forfeitTxs map[domain.VtxoKey][]string
connectors []string
vtxos []domain.Vtxo

doneCh chan struct{}
doneOnce sync.Once
}

func newForfeitTxsMap(txBuilder ports.TxBuilder) *forfeitTxsMap {
return &forfeitTxsMap{&sync.RWMutex{}, txBuilder, make(map[domain.VtxoKey][]string), nil, nil}
return &forfeitTxsMap{
lock: &sync.RWMutex{},
builder: txBuilder,
forfeitTxs: make(map[domain.VtxoKey][]string),
connectors: nil,
vtxos: nil,
doneCh: make(chan struct{}),
}
}

func (m *forfeitTxsMap) init(connectors []string, requests []domain.TxRequest) {
Expand Down Expand Up @@ -303,6 +313,12 @@ func (m *forfeitTxsMap) sign(txs []string) error {
m.forfeitTxs[vtxoKey] = txs
}

if m.allSigned() {
m.doneOnce.Do(func() {
close(m.doneCh)
})
}

return nil
}

Expand Down Expand Up @@ -332,6 +348,19 @@ func (m *forfeitTxsMap) pop() ([]string, error) {
return txs, nil
}

func (m *forfeitTxsMap) allSigned() bool {
m.lock.RLock()
defer m.lock.RUnlock()

for _, txs := range m.forfeitTxs {
if len(txs) == 0 {
return false
}
}

return true
}

// onchainOutputs iterates over all the nodes' outputs in the vtxo tree and checks their onchain state
// returns the sweepable outputs as ports.SweepInput mapped by their expiration time
func findSweepableOutputs(
Expand Down

0 comments on commit 8a14c54

Please sign in to comment.