diff --git a/zp-relayer/configs/commitmentWatcherConfig.ts b/zp-relayer/configs/commitmentWatcherConfig.ts index bf0557f3..dff91337 100644 --- a/zp-relayer/configs/commitmentWatcherConfig.ts +++ b/zp-relayer/configs/commitmentWatcherConfig.ts @@ -3,7 +3,7 @@ import { getBaseConfig } from './baseConfig' import { getGasPriceConfig } from './common/gasPriceConfig' import { getNetworkConfig } from './common/networkConfig' import { getTxManagerConfig } from './common/txManagerConfig' -import { zBooleanString } from './common/utils' +import { zBN, zBooleanString } from './common/utils' const zSchema = z.object({ COMMITMENT_WATCHER_PORT: z.coerce.number().default(8000), @@ -14,7 +14,7 @@ const zSchema = z.object({ COMMITMENT_WATCHER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'), COMMITMENT_WATCHER_FETCH_INTERVAL: z.coerce.number().default(10000), COMMITMENT_WATCHER_TX_REDUNDANCY: zBooleanString().default('false'), - COMMITMENT_WATCHER_FEE: z.coerce.number().default(100_000_000), + COMMITMENT_WATCHER_FEE: zBN().default("100_000_000"), }) const network = getNetworkConfig() diff --git a/zp-relayer/configs/indexerConfig.ts b/zp-relayer/configs/indexerConfig.ts index 5632f848..943a2bd0 100644 --- a/zp-relayer/configs/indexerConfig.ts +++ b/zp-relayer/configs/indexerConfig.ts @@ -10,6 +10,7 @@ const schema = z.object({ INDEXER_STATE_DIR_PATH: z.string().default('./INDEXER_STATE'), INDEXER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'), INDEXER_TOKEN_ADDRESS: z.string(), + INDEXER_BLOCK_CONFIRMATIONS: z.coerce.number().default(1), }) const config = schema.parse(process.env) diff --git a/zp-relayer/pool/BasePool.ts b/zp-relayer/pool/BasePool.ts index 3868025d..ff90742b 100644 --- a/zp-relayer/pool/BasePool.ts +++ b/zp-relayer/pool/BasePool.ts @@ -143,7 +143,7 @@ export abstract class BasePool { return lastBlockNumber } - async syncState(startBlock?: number, indexerUrl?: string) { + async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) { logger.debug('Syncing state; starting from block %d', startBlock) const localIndex = this.state.getNextIndex() @@ -166,10 +166,10 @@ export abstract class BasePool { if (indexerUrl) { await this.syncStateFromIndexer(indexerUrl) - } else if (startBlock) { - await this.syncStateFromContract(startBlock, contractIndex, localIndex) + } else if (startBlock && lastBlock) { + await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex) } else { - throw new Error('Either startBlock or indexerUrl should be provided for sync') + throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync') } const newLocalIndex = this.state.getNextIndex() @@ -217,23 +217,17 @@ export abstract class BasePool { }) } - async syncStateFromContract(startBlock: number, contractIndex: number, localIndex: number) { + async syncStateFromContract(startBlock: number, lastBlock: number, contractIndex: number, localIndex: number) { const numTxs = Math.floor((contractIndex - localIndex) / OUTPLUSONE) if (numTxs < 0) { // TODO: rollback state throw new Error('State is corrupted, contract index is less than local index') } - const missedIndices = Array(numTxs) - for (let i = 0; i < numTxs; i++) { - missedIndices[i] = localIndex + (i + 1) * OUTPLUSONE - } - - const lastBlockNumber = (await this.getLastBlockToProcess()) + 1 for await (const batch of this.network.getEvents({ contract: this.network.pool, startBlock, - lastBlock: lastBlockNumber, + lastBlock, event: 'Message', batchSize: this.config.eventsBatchSize, })) { diff --git a/zp-relayer/pool/DefaultPool.ts b/zp-relayer/pool/DefaultPool.ts index 42a37f16..96d4c411 100644 --- a/zp-relayer/pool/DefaultPool.ts +++ b/zp-relayer/pool/DefaultPool.ts @@ -75,7 +75,8 @@ export class DefaultPool extends BasePool { } await this.permitRecover?.initializeDomain() if (startBlock) { - await this.syncState(startBlock) + const lastBlock = await this.getLastBlockToProcess() + await this.syncState(startBlock, lastBlock) } this.isInitialized = true } diff --git a/zp-relayer/pool/FinalizerPool.ts b/zp-relayer/pool/FinalizerPool.ts index 3c744776..029a8110 100644 --- a/zp-relayer/pool/FinalizerPool.ts +++ b/zp-relayer/pool/FinalizerPool.ts @@ -28,7 +28,7 @@ export class FinalizerPool extends BasePool { this.denominator = toBN(await this.network.pool.call('denominator')) this.poolId = toBN(await this.network.pool.call('pool_id')) - await this.syncState(undefined, indexerUrl) + await this.syncState(undefined, undefined, indexerUrl) this.isInitialized = true } @@ -38,7 +38,7 @@ export class FinalizerPool extends BasePool { async buildFinalizeTx({ transaction: { outCommit }, }: PoolTx): Promise> { - await this.syncState(undefined, this.indexerUrl) + await this.syncState(undefined, undefined, this.indexerUrl) const func = 'proveTreeUpdate(uint256,uint256[8],uint256)' diff --git a/zp-relayer/pool/IndexerPool.ts b/zp-relayer/pool/IndexerPool.ts index 4cbc29fe..a673331e 100644 --- a/zp-relayer/pool/IndexerPool.ts +++ b/zp-relayer/pool/IndexerPool.ts @@ -6,14 +6,14 @@ import { type PermitRecover } from '@/utils/permit/types' export class IndexerPool extends BasePool { public permitRecover: PermitRecover | null = null - async init(startBlock: number | null = null) { + async init(startBlock: number | null = null, lastBlock: number | null = null) { if (this.isInitialized) return this.denominator = toBN(await this.network.pool.call('denominator')) this.poolId = toBN(await this.network.pool.call('pool_id')) - if (startBlock) { - await this.syncState(startBlock) + if (startBlock && lastBlock) { + await this.syncState(startBlock, lastBlock) } this.isInitialized = true } diff --git a/zp-relayer/services/commitment-watcher/index.ts b/zp-relayer/services/commitment-watcher/index.ts index 9a7b2692..5ce49903 100644 --- a/zp-relayer/services/commitment-watcher/index.ts +++ b/zp-relayer/services/commitment-watcher/index.ts @@ -4,10 +4,10 @@ import express from 'express' import { init } from './init' import { createRouter } from './router' -init().then(() => { +init().then((pool) => { const app = express() - app.use(createRouter()) + app.use(createRouter(pool)) const PORT = config.COMMITMENT_WATCHER_PORT app.listen(PORT, () => logger.info(`Started commitment-watcher on port ${PORT}`)) }) diff --git a/zp-relayer/services/commitment-watcher/init.ts b/zp-relayer/services/commitment-watcher/init.ts index 18cb4841..7c4a9046 100644 --- a/zp-relayer/services/commitment-watcher/init.ts +++ b/zp-relayer/services/commitment-watcher/init.ts @@ -113,4 +113,6 @@ export async function init() { workers.forEach(w => w.run()) runWatcher(pool) + + return pool } diff --git a/zp-relayer/services/commitment-watcher/router.ts b/zp-relayer/services/commitment-watcher/router.ts index 8cb4b850..03243265 100644 --- a/zp-relayer/services/commitment-watcher/router.ts +++ b/zp-relayer/services/commitment-watcher/router.ts @@ -1,11 +1,14 @@ import config from '@/configs/commitmentWatcherConfig' import { logger } from '@/lib/appLogger' +import { BasePool } from '@/pool/BasePool' import { poolTxQueue, WorkerTx, WorkerTxType } from '@/queue/poolTxQueue' +import { applyDenominator } from '@/utils/helpers' import { ValidationError } from '@/validation/api/validation' import cors from 'cors' import express, { NextFunction, Request, Response } from 'express' +import { toBN } from 'web3-utils' -export function createRouter() { +export function createRouter(pool: BasePool) { const router = express.Router() router.use(cors()) @@ -26,7 +29,9 @@ export function createRouter() { }) router.get('/fee', (req, res) => { - res.json({ fee: config.COMMITMENT_WATCHER_FEE }) + const dInverse = toBN(1).shln(255) + const fee = applyDenominator(config.COMMITMENT_WATCHER_FEE, pool.denominator.xor(dInverse)) + res.json({ fee: fee.toString(10) }) }) router.get('/job/:commitment', async (req, res) => { diff --git a/zp-relayer/services/indexer/init.ts b/zp-relayer/services/indexer/init.ts index 163b3d30..d582624b 100644 --- a/zp-relayer/services/indexer/init.ts +++ b/zp-relayer/services/indexer/init.ts @@ -12,12 +12,13 @@ export async function init() { eventsBatchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE, }) - await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK)]) + const lastInitialSyncBlock = await pool.getLastBlockToProcess() + await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK, lastInitialSyncBlock)]) - const startBlock = await pool.getLastBlockToProcess() + const startBlock = lastInitialSyncBlock + 1 const watcher = new Watcher(networkBackend, networkBackend.pool, 'pool-indexer', { event: 'allEvents', - blockConfirmations: parseInt(process.env.INDEXER_BLOCK_CONFIRMATIONS || '1'), + blockConfirmations: config.INDEXER_BLOCK_CONFIRMATIONS, startBlock, eventPollingInterval: parseInt(process.env.WATCHER_EVENT_POLLING_INTERVAL || '10000'), batchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE,