This repository was archived by the owner on Oct 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmigrate.go
176 lines (143 loc) · 4.4 KB
/
migrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package commander
import (
"context"
"fmt"
"sort"
"github.com/Worldcoin/hubble-commander/client"
"github.com/Worldcoin/hubble-commander/commander/executor"
"github.com/Worldcoin/hubble-commander/models"
"github.com/Worldcoin/hubble-commander/models/dto"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
var errMissingBootstrapNodeURL = fmt.Errorf("bootstrap node URL is required for migration mode")
func (c *Commander) migrate() error {
nodeURL := c.cfg.Bootstrap.BootstrapNodeURL
if nodeURL == nil {
return errors.WithStack(errMissingBootstrapNodeURL)
}
log.Printf("Migration mode is on, syncing data from commander instance running at %s\n", *nodeURL)
hubbleClient := client.NewHubble(*nodeURL, c.cfg.API.AuthenticationKey)
return c.migrateCommanderData(hubbleClient)
}
func (c *Commander) migrateCommanderData(hubble client.Hubble) error {
// TODO: we probably don't need to sync these?
failedTxsCount, err := c.syncFailedTxs(hubble)
if err != nil {
return err
}
log.Printf("Synced %d failed transaction(s)\n", failedTxsCount)
pendingBatchesCount, err := c.syncPendingBatches(hubble)
if err != nil {
return err
}
log.Printf("Synced %d pending batch(es)\n", pendingBatchesCount)
// txns must be synced after pending batches, because the mempool is only
// guaranteed to cleanly apply to the pending state
pendingTxsCount, err := c.syncPendingTxs(hubble)
if err != nil {
return err
}
log.Printf("Synced %d pending transaction(s)\n", pendingTxsCount)
c.setMigrate(false)
return nil
}
func (c *Commander) syncPendingTxs(hubble client.Hubble) (int, error) {
txs, err := hubble.GetPendingTransactions()
if err != nil {
return 0, err
}
err = c.addPendingTxs(txs)
if err != nil {
return 0, err
}
return txs.Len(), nil
}
type byNonce struct {
array models.GenericTransactionArray
}
func (b byNonce) Len() int {
return b.array.Len()
}
func (b byNonce) Swap(i, j int) {
left, right := b.array.At(i), b.array.At(j)
b.array.Set(i, right)
b.array.Set(j, left)
}
func (b byNonce) Less(i, j int) bool {
left, right := b.array.At(i), b.array.At(j)
leftNonceAddressableValue := left.GetNonce()
rightNonceAddressableValue := right.GetNonce()
return leftNonceAddressableValue.Cmp(&rightNonceAddressableValue) <= 0
}
func (c *Commander) addPendingTxs(txs models.GenericTransactionArray) error {
// these are given to us in a random order, but the mempool will only accept them
// in the correct order.
// TODO: this will fail if any transactions are funded by other transactions,
// we need to throw them into a non-validating mempool and then read them
// out??
sort.Sort(byNonce{txs})
for i := 0; i < txs.Len(); i += 1 {
tx := txs.At(i)
nonce := tx.GetNonce()
log.WithFields(log.Fields{
"nonce": nonce.Uint64(),
"from": tx.GetFromStateID(),
}).Debug("tx: ", tx)
}
for i := 0; i < txs.Len(); i += 1 {
tx := txs.At(i)
err := c.storage.AddMempoolTx(tx)
if err != nil {
// TODO: skip this tx if the error is not a badger error
return err
}
}
return nil
}
func (c *Commander) syncFailedTxs(hubble client.Hubble) (int, error) {
txs, err := hubble.GetFailedTransactions()
if err != nil {
return 0, err
}
err = c.storage.AddFailedTransactions(txs)
if err != nil {
return 0, err
}
return txs.Len(), nil
}
func (c *Commander) syncPendingBatches(hubble client.Hubble) (int, error) {
pendingBatches, err := hubble.GetPendingBatches()
if err != nil {
return 0, err
}
for i := range pendingBatches {
err = c.syncPendingBatch(dtoToModelsBatch(&pendingBatches[i]))
if err != nil {
return 0, err
}
}
return len(pendingBatches), nil
}
func (c *Commander) syncPendingBatch(batch *models.PendingBatch) (err error) {
ctx := executor.NewRollupLoopContext(c.storage, c.client, c.cfg.Rollup, c.metrics, context.Background(), batch.Type)
defer ctx.Rollback(&err)
err = ctx.ExecutePendingBatch(batch)
if err != nil {
return err
}
return ctx.Commit()
}
func dtoToModelsBatch(dtoBatch *dto.PendingBatch) *models.PendingBatch {
batch := models.PendingBatch{
ID: dtoBatch.ID,
Type: dtoBatch.Type,
TransactionHash: dtoBatch.TransactionHash,
PrevStateRoot: dtoBatch.PrevStateRoot,
Commitments: make([]models.PendingCommitment, 0, len(dtoBatch.Commitments)),
}
for i := range dtoBatch.Commitments {
batch.Commitments = append(batch.Commitments, models.PendingCommitment(dtoBatch.Commitments[i]))
}
return &batch
}