Skip to content

Commit

Permalink
chore(zk-code-review): handling errors and depricated calls
Browse files Browse the repository at this point in the history
  • Loading branch information
elliothllm committed Jan 30, 2025
1 parent e393448 commit 5b04168
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 52 deletions.
6 changes: 3 additions & 3 deletions zk/datastream/client/stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestStreamClientReadHeaderEntry(t *testing.T) {
c := NewClient(context.Background(), "", false, 500*time.Millisecond, 0, DefaultEntryChannelSize)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
defer require.NoError(t, c.Stop())

c.conn = conn
go func() {
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestStreamClientReadResultEntry(t *testing.T) {
c := NewClient(context.Background(), "", false, 500*time.Millisecond, 0, DefaultEntryChannelSize)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
defer require.NoError(t, c.Stop())

c.conn = conn
go func() {
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestStreamClientReadFileEntry(t *testing.T) {
c := NewClient(context.Background(), "", false, 500*time.Millisecond, 0, DefaultEntryChannelSize)
server, conn := net.Pipe()
defer server.Close()
defer c.Stop()
defer require.NoError(t, c.Stop())

c.conn = conn
go func() {
Expand Down
10 changes: 7 additions & 3 deletions zk/l1_cache/l1_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func handleRequest(db kv.RwDB) http.HandlerFunc {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Cache-Status", "HIT")
w.Write(cachedResponse)
tx.Commit()
if err = tx.Commit(); err != nil {
return
}
return
}
tx.Rollback()
Expand Down Expand Up @@ -250,11 +252,13 @@ func handleRequest(db kv.RwDB) http.HandlerFunc {
return
}
defer tx.Rollback()
if err := saveToCache(tx, cacheKey, responseBody, cacheDuration); err != nil {
if err = saveToCache(tx, cacheKey, responseBody, cacheDuration); err != nil {
http.Error(w, "Failed to save to cache", http.StatusInternalServerError)
return
}
tx.Commit()
if err = tx.Commit(); err != nil {
return
}
}
}
} else {
Expand Down
13 changes: 9 additions & 4 deletions zk/stages/stage_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,13 @@ func SpawnStageBatches(
if cfg.zkCfg.DebugLimit > 0 {
finishProg, err := stages.GetStageProgress(tx, stages.Finish)
if err != nil {
return err
}
if finishProg >= cfg.zkCfg.DebugLimit {
log.Info(fmt.Sprintf("[%s] Debug limit reached", logPrefix), "finishProg", finishProg, "debugLimit", cfg.zkCfg.DebugLimit)
syscall.Kill(os.Getpid(), syscall.SIGINT)
if err = syscall.Kill(os.Getpid(), syscall.SIGINT); err != nil {
return err
}
}

if stageProgressBlockNo >= cfg.zkCfg.DebugLimit {
Expand Down Expand Up @@ -655,13 +658,15 @@ func PruneBatchesStage(s *stagedsync.PruneState, tx kv.RwTx, cfg BatchesCfg, ctx
return fmt.Errorf("TruncateBlocks: %w", err)
}

if err := hermezDb.DeleteForkIds(0, toBlock); err != nil {
if err = hermezDb.DeleteForkIds(0, toBlock); err != nil {
return fmt.Errorf("DeleteForkIds: %w", err)
}
if err := hermezDb.DeleteBlockBatches(0, toBlock); err != nil {

if err = hermezDb.DeleteBlockBatches(0, toBlock); err != nil {
return fmt.Errorf("DeleteBlockBatches: %w", err)
}
if hermezDb.DeleteBlockGlobalExitRoots(0, toBlock); err != nil {

if err = hermezDb.DeleteBlockGlobalExitRoots(0, toBlock); err != nil {
return fmt.Errorf("DeleteBlockGlobalExitRoots: %w", err)
}

Expand Down
6 changes: 5 additions & 1 deletion zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,11 @@ func sequencingBatchStep(
if !batchState.resequenceBatchJob.HasMoreBlockToProcess() {
for {
if pending, _ := streamWriter.legacyVerifier.HasPendingVerifications(); pending {
streamWriter.CommitNewUpdates()
_, _, err = streamWriter.CommitNewUpdates()
if err != nil {
log.Error(fmt.Sprintf("[%s] Error committing new updates", logPrefix), "error", err)
}

time.Sleep(1 * time.Second)
} else {
break
Expand Down
5 changes: 0 additions & 5 deletions zk/stages/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import (

stages "github.com/ledgerwatch/erigon/eth/stagedsync"
stages2 "github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/zk/datastream/server"
)

var (
dataStreamServerFactory = server.NewZkEVMDataStreamServerFactory()
)

func SequencerZkStages(
Expand Down
17 changes: 11 additions & 6 deletions zk/stages/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"io"
"os"

"math/big"
"net/http"
"strconv"
Expand All @@ -13,11 +15,11 @@ import (
"net/url"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/hexutil"
"github.com/ledgerwatch/erigon/core/types"
db2 "github.com/ledgerwatch/erigon/smt/pkg/db"
jsonClient "github.com/ledgerwatch/erigon/zkevm/jsonrpc/client"
jsonTypes "github.com/ledgerwatch/erigon/zkevm/jsonrpc/types"
"github.com/ledgerwatch/erigon-lib/common/hexutil"
)

const (
Expand Down Expand Up @@ -58,7 +60,7 @@ func RpcStateRootByTxNo(rpcUrl string, txNo *big.Int) (*common.Hash, error) {

defer response.Body.Close()

responseBytes, err := ioutil.ReadAll(response.Body)
responseBytes, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -90,7 +92,7 @@ func DumpDb(eridb *db2.EriDb) {
fmt.Println(err)
}
// write to file
if err := ioutil.WriteFile("db.json", op, 0600); err != nil {
if err := os.WriteFile("db.json", op, 0600); err != nil {
fmt.Println(err)
}
}
Expand Down Expand Up @@ -120,10 +122,13 @@ func RpcGetHighestTxNo(rpcEndpoint string) (uint64, error) {
}
defer resp.Body.Close()

body, _ := ioutil.ReadAll(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
}

var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
if err = json.Unmarshal(body, &result); err != nil {
return 0, err
}

Expand Down
32 changes: 17 additions & 15 deletions zk/txpool/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ import (
"github.com/ledgerwatch/erigon-lib/types"
)

const (
logCountPolicyTransactions = "log_count_output" // config variable name
)

// Operation
type Operation byte

Expand Down Expand Up @@ -575,26 +571,28 @@ func ListContentAtACL(ctx context.Context, db kv.RwDB) ([]string, error) {
buffer.WriteString(fmt.Sprint(key, config, "\n"))
}

err := db.View(ctx, func(tx kv.Tx) error {
var err error
if err = db.View(ctx, func(tx kv.Tx) error {
// Config table
buffer.WriteString("\nConfig\n")
err := tx.ForEach(Config, nil, func(k, v []byte) error {
if err = tx.ForEach(Config, nil, func(k, v []byte) error {
buffer.WriteString(fmt.Sprintf("Key: %s, Value: %s\n", string(k), string(v)))
bufferConfig.WriteString(fmt.Sprintf("Key: %s, Value: %s\n", string(k), string(v)))
return nil
})
}); err != nil {
return err
}

// BlockList table
var BlockListContent strings.Builder
err = tx.ForEach(BlockList, nil, func(k, v []byte) error {
if err = tx.ForEach(BlockList, nil, func(k, v []byte) error {
BlockListContent.WriteString(fmt.Sprintf(
"Key: %s, Value: {\n%s\n}\n",
hex.EncodeToString(k),
policyMapping(v, policiesList),
))
return nil
})
if err != nil {
}); err != nil {
return err
}
if BlockListContent.String() != "" {
Expand All @@ -612,15 +610,14 @@ func ListContentAtACL(ctx context.Context, db kv.RwDB) ([]string, error) {
}
// Allowlist table
var AllowlistContent strings.Builder
err = tx.ForEach(Allowlist, nil, func(k, v []byte) error {
if err = tx.ForEach(Allowlist, nil, func(k, v []byte) error {
AllowlistContent.WriteString(fmt.Sprintf(
"Key: %s, Value: {\n%s\n}\n",
hex.EncodeToString(k),
policyMapping(v, policiesList),
))
return nil
})
if err != nil {
}); err != nil {
return err
}
if AllowlistContent.String() != "" {
Expand All @@ -638,7 +635,9 @@ func ListContentAtACL(ctx context.Context, db kv.RwDB) ([]string, error) {
}

return err
})
}); err != nil {
return nil, err
}

combinedBuffers = append(combinedBuffers, buffer.String())
combinedBuffers = append(combinedBuffers, bufferConfig.String())
Expand All @@ -656,7 +655,10 @@ func SetMode(ctx context.Context, aclDB kv.RwDB, mode string) error {
}

return aclDB.Update(ctx, func(tx kv.RwTx) error {
err := tx.Put(Config, []byte(modeKey), []byte(m))
err = tx.Put(Config, []byte(modeKey), []byte(m))
if err != nil {
return err
}

// Timestamp bytes + single byte.
mb := ResolveACLTypeToBinary(mode)
Expand Down
27 changes: 13 additions & 14 deletions zk/txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,10 +1193,10 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) Disca
if found != nil {
tipThreshold := uint256.NewInt(0)
tipThreshold = tipThreshold.Mul(&found.Tx.Tip, uint256.NewInt(100+p.cfg.PriceBump))
tipThreshold.Div(tipThreshold, u256.N100)
tipThreshold.Div(tipThreshold, (*uint256.Int)(u256.N100))
feecapThreshold := uint256.NewInt(0)
feecapThreshold.Mul(&found.Tx.FeeCap, uint256.NewInt(100+p.cfg.PriceBump))
feecapThreshold.Div(feecapThreshold, u256.N100)
feecapThreshold.Div(feecapThreshold, (*uint256.Int)(u256.N100))
if mt.Tx.Tip.Cmp(tipThreshold) < 0 || mt.Tx.FeeCap.Cmp(feecapThreshold) < 0 {
// Both tip and feecap need to be larger than previously to replace the transaction
// In case if the transation is stuck, "poke" it to rebroadcast
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func removeMined(byNonce *BySenderAndNonce, minedTxs []*types.TxSlot, pending *P
// promote reasserts invariants of the subpool and returns the list of transactions that ended up
// being promoted to the pending or basefee pool, for re-broadcasting
func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint64, discard func(*metaTx, DiscardReason), announcements *types.Announcements) {
// Demote worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard
// Demote the worst transactions that do not qualify for pending sub pool anymore, to other sub pools, or discard
for worst := pending.Worst(); pending.Len() > 0 && (worst.subPool < BaseFeePoolBits || worst.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) < 0); worst = pending.Worst() {
if worst.subPool >= BaseFeePoolBits {
tx := pending.PopWorst()
Expand All @@ -1348,14 +1348,14 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}
}

// Promote best transactions from base fee pool to pending pool while they qualify
// Promote the best transactions from base fee pool to pending pool while they qualify
for best := baseFee.Best(); baseFee.Len() > 0 && best.subPool >= BaseFeePoolBits && best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0; best = baseFee.Best() {
tx := baseFee.PopBest()
announcements.Append(tx.Tx.Type, tx.Tx.Size, tx.Tx.IDHash[:])
pending.Add(tx)
}

// Demote worst transactions that do not qualify for base fee pool anymore, to queued sub pool, or discard
// Demote the worst transactions that do not qualify for base fee pool anymore, to queued sub pool, or discard
for worst := baseFee.Worst(); baseFee.Len() > 0 && worst.subPool < BaseFeePoolBits; worst = baseFee.Worst() {
if worst.subPool >= QueuedPoolBits {
queued.Add(baseFee.PopWorst())
Expand All @@ -1364,7 +1364,7 @@ func promote(pending *PendingPool, baseFee, queued *SubPool, pendingBaseFee uint
}
}

// Promote best transactions from the queued pool to either pending or base fee pool, while they qualify
// Promote the best transactions from the queued pool to either pending or base fee pool, while they qualify
for best := queued.Best(); queued.Len() > 0 && best.subPool >= BaseFeePoolBits; best = queued.Best() {
if best.minFeeCap.Cmp(uint256.NewInt(pendingBaseFee)) >= 0 {
tx := queued.PopBest()
Expand Down Expand Up @@ -1499,7 +1499,7 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
continue
}

// Empty rlp can happen if a transaction we want to broadcase has just been mined, for example
// Empty rlp can happen if a transaction we want to broadcast has just been mined, for example
slotsRlp = append(slotsRlp, slotRlp)
if p.IsLocal(hash) {
localTxTypes = append(localTxTypes, t)
Expand Down Expand Up @@ -1581,7 +1581,6 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
delete(p.senders.senderIDs, addr)
}
}
//fmt.Printf("del:%d,%d,%d\n", mt.Tx.senderID, mt.Tx.nonce, mt.Tx.tip)
has, err := tx.Has(kv.PoolTransaction, idHash)
if err != nil {
return err
Expand Down Expand Up @@ -2481,11 +2480,11 @@ func (mt *metaTx) worse(than *metaTx, pendingBaseFee uint256.Int) bool {
return mt.timestamp > than.timestamp
}

func (p BestQueue) Len() int { return len(p.ms) }
func (p BestQueue) Less(i, j int) bool {
func (p *BestQueue) Len() int { return len(p.ms) }
func (p *BestQueue) Less(i, j int) bool {
return p.ms[i].better(p.ms[j], *uint256.NewInt(p.pendingBastFee))
}
func (p BestQueue) Swap(i, j int) {
func (p *BestQueue) Swap(i, j int) {
p.ms[i], p.ms[j] = p.ms[j], p.ms[i]
p.ms[i].bestIndex = i
p.ms[j].bestIndex = j
Expand Down Expand Up @@ -2513,11 +2512,11 @@ type WorstQueue struct {
pendingBaseFee uint64
}

func (p WorstQueue) Len() int { return len(p.ms) }
func (p WorstQueue) Less(i, j int) bool {
func (p *WorstQueue) Len() int { return len(p.ms) }
func (p *WorstQueue) Less(i, j int) bool {
return p.ms[i].worse(p.ms[j], *uint256.NewInt(p.pendingBaseFee))
}
func (p WorstQueue) Swap(i, j int) {
func (p *WorstQueue) Swap(i, j int) {
p.ms[i], p.ms[j] = p.ms[j], p.ms[i]
p.ms[i].worstIndex = i
p.ms[j].worstIndex = j
Expand Down
3 changes: 2 additions & 1 deletion zk/txpool/pool_zk_limbo_persistent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func store(t *testing.T, dbPath string) *TxPool {
cacheView, err := pTarget._stateCache.View(context.Background(), tx)
assert.NilError(t, err)

tx.CreateBucket(TablePoolLimbo)
err = tx.CreateBucket(TablePoolLimbo)
assert.NilError(t, err)
err = pTarget.fromDBLimbo(context.Background(), tx, cacheView)
assert.NilError(t, err)
err = pTarget.fromDBLimbo(context.Background(), tx, cacheView)
Expand Down

0 comments on commit 5b04168

Please sign in to comment.