Skip to content

Commit

Permalink
Fix forkID (#2511)
Browse files Browse the repository at this point in the history
* fix forkID reset

* linter

* sync logs

* leave blockNum column

* update on conflict

* logs

* fix unit test
  • Loading branch information
ARR552 committed Sep 6, 2023
1 parent e0d0780 commit 1bcde69
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 79 deletions.
7 changes: 7 additions & 0 deletions db/migrations/state/0009.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE IF EXISTS state.fork_id DROP CONSTRAINT IF EXISTS fork_id_block_num_fkey;

-- +migrate Down
ALTER TABLE IF EXISTS state.fork_id ADD CONSTRAINT fork_id_block_num_fkey
FOREIGN KEY(block_num)
REFERENCES state.block (block_num) ON DELETE CASCADE;
95 changes: 95 additions & 0 deletions db/migrations/state/0009_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package migrations_test

import (
"database/sql"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

// this migration changes length of the token name
type migrationTest0009 struct{}

const addBlock = "INSERT INTO state.block (block_num, received_at, block_hash) VALUES ($1, $2, $3)"

func (m migrationTest0009) InsertData(db *sql.DB) error {
// Insert block to respect the FKey
if _, err := db.Exec(addBlock, 1, time.Now(), "0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9f1"); err != nil {
return err
}
if _, err := db.Exec(addBlock, 2, time.Now(), "0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9f2"); err != nil {
return err
}

return nil
}

func (m migrationTest0009) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) {
// Insert forkID
const insertForkID = `INSERT INTO state.fork_id (
from_batch_num, to_batch_num, fork_id, version, block_num) VALUES (
1, 10, 1, 'First version', 1
);`
_, err := db.Exec(insertForkID)
assert.NoError(t, err)

const insertForkID2 = `INSERT INTO state.fork_id (
from_batch_num, to_batch_num, fork_id, version, block_num) VALUES (
1, 10, 2, 'First version', 10
);`
_, err = db.Exec(insertForkID2)
assert.NoError(t, err)

const insertForkID3 = `INSERT INTO state.fork_id (
from_batch_num, to_batch_num, fork_id, version) VALUES (
1, 10, 3, 'First version'
);`
_, err = db.Exec(insertForkID3)
assert.Error(t, err)

const constrainQuery = `select count(*) from pg_constraint c join pg_class t on c.conrelid = t.oid
join pg_namespace n on t.relnamespace = n.oid where c.conname = 'fork_id_block_num_fkey';`
row := db.QueryRow(constrainQuery)
var count int
assert.NoError(t, row.Scan(&count))
assert.Equal(t, 0, count)

_, err = db.Exec(addBlock, 10, time.Now(), "0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9f2")
assert.NoError(t, err)
}

func (m migrationTest0009) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
// Insert forkID
const insertForkID = `INSERT INTO state.fork_id (
from_batch_num, to_batch_num, fork_id, version, block_num) VALUES (
1, 10, 4, 'First version', 2
);`
_, err := db.Exec(insertForkID)
assert.NoError(t, err)

const insertForkID2 = `INSERT INTO state.fork_id (
from_batch_num, to_batch_num, fork_id, version, block_num) VALUES (
1, 10, 5, 'First version', 20
);`
_, err = db.Exec(insertForkID2)
assert.Error(t, err)

const insertForkID3 = `INSERT INTO state.fork_id (
from_batch_num, to_batch_num, fork_id, version) VALUES (
1, 10, 6, 'First version'
);`
_, err = db.Exec(insertForkID3)
assert.Error(t, err)

const constrainQuery = `select count(*) from pg_constraint c join pg_class t on c.conrelid = t.oid
join pg_namespace n on t.relnamespace = n.oid where c.conname = 'fork_id_block_num_fkey';`
row := db.QueryRow(constrainQuery)
var count int
assert.NoError(t, row.Scan(&count))
assert.Equal(t, 1, count)
}

func TestMigration0009(t *testing.T) {
runMigrationTest(t, 9, migrationTest0009{})
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ services:
zkevm-prover:
container_name: zkevm-prover
restart: unless-stopped
image: hermeznetwork/zkevm-prover:v2.2.0
image: hermeznetwork/zkevm-prover:v2.2.1
depends_on:
zkevm-state-db:
condition: service_healthy
Expand Down
25 changes: 2 additions & 23 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (p *PostgresStorage) Reset(ctx context.Context, blockNumber uint64, dbTx pg
}

// ResetForkID resets the state to reprocess the newer batches with the correct forkID
func (p *PostgresStorage) ResetForkID(ctx context.Context, batchNumber, forkID uint64, version string, dbTx pgx.Tx) error {
func (p *PostgresStorage) ResetForkID(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error {
e := p.getExecQuerier(dbTx)
const resetVirtualStateSQL = "delete from state.block where block_num >=(select min(block_num) from state.virtual_batch where batch_num >= $1)"
if _, err := e.Exec(ctx, resetVirtualStateSQL, batchNumber); err != nil {
Expand All @@ -65,14 +65,6 @@ func (p *PostgresStorage) ResetForkID(ctx context.Context, batchNumber, forkID u
if err != nil {
return err
}
reorg := TrustedReorg{
BatchNumber: batchNumber,
Reason: fmt.Sprintf("New ForkID: %d. Version: %s", forkID, version),
}
err = p.AddTrustedReorg(ctx, &reorg, dbTx)
if err != nil {
return err
}

// Delete proofs for higher batches
const deleteProofsSQL = "delete from state.proof where batch_num >= $1 or (batch_num <= $1 and batch_num_final >= $1)"
Expand Down Expand Up @@ -2446,19 +2438,6 @@ func (p *PostgresStorage) CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64,
return count, nil
}

// GetForkIDTrustedReorgCount returns the forkID
func (p *PostgresStorage) GetForkIDTrustedReorgCount(ctx context.Context, forkID uint64, version string, dbTx pgx.Tx) (uint64, error) {
const forkIDTrustedReorgSQL = "SELECT COUNT(*) FROM state.trusted_reorg WHERE reason=$1"

var count uint64
q := p.getExecQuerier(dbTx)
err := q.QueryRow(ctx, forkIDTrustedReorgSQL, fmt.Sprintf("New ForkID: %d. Version: %s", forkID, version)).Scan(&count)
if err != nil {
return 0, err
}
return count, nil
}

// GetReorgedTransactions returns the transactions that were reorged
func (p *PostgresStorage) GetReorgedTransactions(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) ([]*types.Transaction, error) {
const getReorgedTransactionsSql = "SELECT encoded FROM state.transaction t INNER JOIN state.l2block b ON t.l2_block_num = b.block_num WHERE b.batch_num >= $1 ORDER BY l2_block_num ASC"
Expand Down Expand Up @@ -2524,7 +2503,7 @@ func (p *PostgresStorage) GetBatchByForcedBatchNum(ctx context.Context, forcedBa

// AddForkID adds a new forkID to the storage
func (p *PostgresStorage) AddForkID(ctx context.Context, forkID ForkIDInterval, dbTx pgx.Tx) error {
const addForkIDSQL = "INSERT INTO state.fork_id (from_batch_num, to_batch_num, fork_id, version, block_num) VALUES ($1, $2, $3, $4, $5)"
const addForkIDSQL = "INSERT INTO state.fork_id (from_batch_num, to_batch_num, fork_id, version, block_num) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (fork_id) DO UPDATE SET block_num = $5 WHERE state.fork_id.fork_id = $3;"
e := p.getExecQuerier(dbTx)
_, err := e.Exec(ctx, addForkIDSQL, forkID.FromBatchNumber, forkID.ToBatchNumber, forkID.ForkId, forkID.Version, forkID.BlockNumber)
return err
Expand Down
23 changes: 23 additions & 0 deletions state/pgstatestorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,9 @@ func TestForkIDs(t *testing.T) {
for _, fork := range forks {
err = testState.AddForkID(ctx, fork, dbTx)
require.NoError(t, err)
// Insert twice to test on conflict do nothing
err = testState.AddForkID(ctx, fork, dbTx)
require.NoError(t, err)
}

forkIDs, err := testState.GetForkIDs(ctx, dbTx)
Expand All @@ -510,6 +513,26 @@ func TestForkIDs(t *testing.T) {
require.Equal(t, forkID3.ToBatchNumber, forkIDs[len(forkIDs)-1].ToBatchNumber)
require.Equal(t, forkID3.ForkId, forkIDs[len(forkIDs)-1].ForkId)

forkID3.BlockNumber = 101
err = testState.AddForkID(ctx, forkID3, dbTx)
require.NoError(t, err)
forkIDs, err = testState.GetForkIDs(ctx, dbTx)
require.NoError(t, err)
require.Equal(t, 3, len(forkIDs))
require.Equal(t, forkID3.ToBatchNumber, forkIDs[len(forkIDs)-1].ToBatchNumber)
require.Equal(t, forkID3.ForkId, forkIDs[len(forkIDs)-1].ForkId)
require.Equal(t, forkID3.BlockNumber, forkIDs[len(forkIDs)-1].BlockNumber)

forkID3.BlockNumber = 2
err = testState.AddForkID(ctx, forkID3, dbTx)
require.NoError(t, err)
forkIDs, err = testState.GetForkIDs(ctx, dbTx)
require.NoError(t, err)
require.Equal(t, 3, len(forkIDs))
require.Equal(t, forkID3.ToBatchNumber, forkIDs[len(forkIDs)-1].ToBatchNumber)
require.Equal(t, forkID3.ForkId, forkIDs[len(forkIDs)-1].ForkId)
require.Equal(t, forkID3.BlockNumber, forkIDs[len(forkIDs)-1].BlockNumber)

require.NoError(t, dbTx.Commit(ctx))
}

Expand Down
4 changes: 2 additions & 2 deletions synchronizer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type stateInterface interface {
AddAccumulatedInputHash(ctx context.Context, batchNum uint64, accInputHash common.Hash, dbTx pgx.Tx) error
AddTrustedReorg(ctx context.Context, trustedReorg *state.TrustedReorg, dbTx pgx.Tx) error
GetReorgedTransactions(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) ([]*ethTypes.Transaction, error)
ResetForkID(ctx context.Context, batchNumber, forkID uint64, version string, dbTx pgx.Tx) error
GetForkIDTrustedReorgCount(ctx context.Context, forkID uint64, version string, dbTx pgx.Tx) (uint64, error)
ResetForkID(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
GetForkIDs(ctx context.Context, dbTx pgx.Tx) ([]state.ForkIDInterval, error)
AddForkIDInterval(ctx context.Context, newForkID state.ForkIDInterval, dbTx pgx.Tx) error
SetLastBatchInfoSeenOnEthereum(ctx context.Context, lastBatchNumberSeen, lastBatchNumberVerified uint64, dbTx pgx.Tx) error
SetInitSyncBatch(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) error
Expand Down
34 changes: 18 additions & 16 deletions synchronizer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 25 additions & 35 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,21 @@ func (s *ClientSynchronizer) processForkID(forkID etherman.ForkID, blockNumber u
BlockNumber: blockNumber,
}

// If forkID affects to a batch from the past. State must be reseted.
log.Debugf("ForkID: %d, synchronization must use the new forkID since batch: %d", forkID.ForkID, forkID.BatchNumber+1)
fIds, err := s.state.GetForkIDs(s.ctx, dbTx)
if err != nil {
log.Error("error getting ForkIDTrustedReorg. Error: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state get forkID trusted state. BlockNumber: %d, rollbackErr: %s, error : %v", blockNumber, rollbackErr.Error(), err)
return rollbackErr
}
return err
}
if len(fIds) != 0 && fIds[len(fIds)-1].ForkId == fID.ForkId { // If the forkID reset was already done
return nil
}
//If the forkID.batchnumber is a future batch
latestBatchNumber, err := s.state.GetLastBatchNumber(s.ctx, dbTx)
if err != nil && !errors.Is(err, state.ErrStateNotSynchronized) {
Expand All @@ -719,41 +734,26 @@ func (s *ClientSynchronizer) processForkID(forkID etherman.ForkID, blockNumber u
}
return err
}
if latestBatchNumber <= forkID.BatchNumber || s.isTrustedSequencer { //If the forkID will start in a future batch or isTrustedSequencer
log.Infof("Just adding forkID. Skipping reset forkID. ForkID: %+v.", fID)
// Add new forkID to the state
err := s.state.AddForkIDInterval(s.ctx, fID, dbTx)
if err != nil {
log.Error("error adding new forkID interval to the state. Error: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %v", blockNumber, rollbackErr.Error(), err)
return rollbackErr
}
return err
}
return nil
}

// If forkID affects to a batch from the past. State must be reseted.
log.Debugf("ForkID: %d, Reverting synchronization to batch: %d", forkID.ForkID, forkID.BatchNumber+1)
count, err := s.state.GetForkIDTrustedReorgCount(s.ctx, forkID.ForkID, forkID.Version, dbTx)
// Add new forkID to the state
err = s.state.AddForkIDInterval(s.ctx, fID, dbTx)
if err != nil {
log.Error("error getting ForkIDTrustedReorg. Error: ", err)
log.Error("error adding new forkID interval to the state. Error: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state get forkID trusted state. BlockNumber: %d, rollbackErr: %s, error : %v", blockNumber, rollbackErr.Error(), err)
log.Errorf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %v", blockNumber, rollbackErr.Error(), err)
return rollbackErr
}
return err
}
if count > 0 { // If the forkID reset was already done
if latestBatchNumber <= forkID.BatchNumber || s.isTrustedSequencer { //If the forkID will start in a future batch or isTrustedSequencer
log.Infof("Just adding forkID. Skipping reset forkID. ForkID: %+v.", fID)
return nil
}

log.Info("ForkID received in the permissionless node that affects to a batch from the past")
//Reset DB only if permissionless node
err = s.state.ResetForkID(s.ctx, forkID.BatchNumber+1, forkID.ForkID, forkID.Version, dbTx)
log.Debugf("ForkID: %d, Reverting synchronization to batch: %d", forkID.ForkID, forkID.BatchNumber+1)
err = s.state.ResetForkID(s.ctx, forkID.BatchNumber+1, dbTx)
if err != nil {
log.Error("error resetting the state. Error: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
Expand All @@ -764,18 +764,6 @@ func (s *ClientSynchronizer) processForkID(forkID etherman.ForkID, blockNumber u
return err
}

// Add new forkID to the state
err = s.state.AddForkIDInterval(s.ctx, fID, dbTx)
if err != nil {
log.Error("error adding new forkID interval to the state. Error: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %v", blockNumber, rollbackErr.Error(), err)
return rollbackErr
}
return err
}

// Commit because it returns an error to force the resync
err = dbTx.Commit(s.ctx)
if err != nil {
Expand Down Expand Up @@ -1489,6 +1477,8 @@ func (s *ClientSynchronizer) processAndStoreTxs(trustedBatch *types.Batch, reque
}
for _, tx := range processBatchResp.Responses {
if state.IsStateRootChanged(executor.RomErrorCode(tx.RomError)) {
log.Info("TrustedBatch info: %+v", processBatchResp)
log.Info("Storing trusted tx %+v", tx)
if err = s.state.StoreTransaction(s.ctx, uint64(trustedBatch.Number), tx, trustedBatch.Coinbase, uint64(trustedBatch.Timestamp), dbTx); err != nil {
log.Errorf("failed to store transactions for batch: %v. Tx: %s", trustedBatch.Number, tx.TxHash.String())
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions test/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ services:

zkevm-prover:
container_name: zkevm-prover
image: hermeznetwork/zkevm-prover:v2.2.0
image: hermeznetwork/zkevm-prover:v2.2.1
ports:
# - 50051:50051 # Prover
- 50052:50052 # Mock prover
Expand Down Expand Up @@ -414,7 +414,7 @@ services:

zkevm-permissionless-prover:
container_name: zkevm-permissionless-prover
image: hermeznetwork/zkevm-prover:v2.2.0
image: hermeznetwork/zkevm-prover:v2.2.1
ports:
# - 50058:50058 # Prover
- 50059:50052 # Mock prover
Expand Down

0 comments on commit 1bcde69

Please sign in to comment.