Skip to content

Commit

Permalink
updates job status on queue inclusion (#217)
Browse files Browse the repository at this point in the history
* updates job status on queue inclusion

* Remove additional logic from getTransactionV2

* Added /proverAddress endpoint

* Propagating job validation error

* Job error description

* Trying to restore balance waiting routine

* Optimistic state for sent transactions

* Updating txHash in cached txs, introducing pendingDeltaIndex field in /info endpoint

* Setting reverted state to failed txs

* Clearing pending cache on tx revert

* Fix minor bugs (#218)

* Fix potential indexer gap

* Fix prover fee

* Don't import FeeOptions

* Use applyDenominator

* Move INDEXER_BLOCK_CONFIRMATIONS to config

* Update zp-relayer/workers/sentTxWorker.ts

Co-authored-by: Alexander Filippov <[email protected]>

* Update zp-relayer/pool/RelayPool.ts

Co-authored-by: Alexander Filippov <[email protected]>

* Update zp-relayer/pool/BasePool.ts

Co-authored-by: Alexander Filippov <[email protected]>

* make jobId mandatory for onFailed callback

---------

Co-authored-by: Alexander Filippov <[email protected]>
Co-authored-by: EvgenKor <[email protected]>
  • Loading branch information
3 people authored Jun 14, 2024
1 parent 6ce96ca commit b52416b
Show file tree
Hide file tree
Showing 21 changed files with 185 additions and 71 deletions.
1 change: 1 addition & 0 deletions zp-relayer/common/serviceUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export function buildTxManager(
gasPriceBumpFactor: tmConfig.TX_MIN_GAS_PRICE_BUMP_FACTOR,
gasPriceSurplus: tmConfig.TX_GAS_PRICE_SURPLUS,
gasPriceMaxFeeLimit: tmConfig.TX_MAX_FEE_PER_GAS_LIMIT,
waitingFundsTimeout: tmConfig.BALANCE_CHECK_TIMEOUT
})
} else {
throw new Error('Unsupported network backend')
Expand Down
4 changes: 2 additions & 2 deletions zp-relayer/configs/commitmentWatcherConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { getBaseConfig } from './baseConfig'
import { getGasPriceConfig } from './common/gasPriceConfig'
import { getNetworkConfig } from './common/networkConfig'
import { getTxManagerConfig } from './common/txManagerConfig'
import { zBooleanString } from './common/utils'
import { zBN, zBooleanString } from './common/utils'

const zSchema = z.object({
COMMITMENT_WATCHER_PORT: z.coerce.number().default(8000),
Expand All @@ -14,7 +14,7 @@ const zSchema = z.object({
COMMITMENT_WATCHER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'),
COMMITMENT_WATCHER_FETCH_INTERVAL: z.coerce.number().default(10000),
COMMITMENT_WATCHER_TX_REDUNDANCY: zBooleanString().default('false'),
COMMITMENT_WATCHER_FEE: z.coerce.number().default(100_000_000),
COMMITMENT_WATCHER_FEE: zBN().default("100_000_000"),
})

const network = getNetworkConfig()
Expand Down
2 changes: 2 additions & 0 deletions zp-relayer/configs/common/txManagerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import { zBN } from './utils'
const zBaseConfig = z
.object({
TX_PRIVATE_KEY: z.string(),
RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT: z.coerce.number().default(5000),
})
.transform(o => ({
TX_ADDRESS: new Web3().eth.accounts.privateKeyToAccount(o.TX_PRIVATE_KEY).address,
TX_PRIVATE_KEY: o.TX_PRIVATE_KEY,
BALANCE_CHECK_TIMEOUT: o.RELAYER_INSUFFICIENT_BALANCE_CHECK_TIMEOUT,
}))

const zTxGas = z
Expand Down
1 change: 1 addition & 0 deletions zp-relayer/configs/indexerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const schema = z.object({
INDEXER_STATE_DIR_PATH: z.string().default('./INDEXER_STATE'),
INDEXER_TX_VK_PATH: z.string().default('../params/transfer_verification_key.json'),
INDEXER_TOKEN_ADDRESS: z.string(),
INDEXER_BLOCK_CONFIRMATIONS: z.coerce.number().default(1),
})

const config = schema.parse(process.env)
Expand Down
26 changes: 26 additions & 0 deletions zp-relayer/lib/network/evm/EvmTxManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ import { Mutex } from 'async-mutex'
import BN from 'bn.js'
import type { Redis } from 'ioredis'
import Web3 from 'web3'
import { toBN } from 'web3-utils'
import type { TransactionConfig } from 'web3-core'
import { Logger } from 'winston'
import promiseRetry from 'promise-retry'

export interface EvmTxManagerConfig {
redis: Redis
gasPriceBumpFactor: number
gasPriceSurplus: number
gasPriceMaxFeeLimit: BN | null
waitingFundsTimeout: number
}

type ExtraInfo = TransactionConfig
Expand Down Expand Up @@ -264,4 +267,27 @@ export class EvmTxManager implements TransactionManager<ExtraInfo> {
})
)
}

waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise<void> {
return promiseRetry(
async retry => {
logger.debug('Getting manager balance')
const newBalance = toBN(await this.web3.eth.getBalance(this.address))
const balanceLog = { balance: newBalance.toString(10), minimumBalance: minimumBalance.toString(10) }
if (newBalance.gte(minimumBalance)) {
logger.info('Relayer has minimum necessary balance', balanceLog)
cb(newBalance)
} else {
logger.warn('Relayer balance is still less than the minimum', balanceLog)
retry(new Error('Not enough balance'))
}
},
{
forever: true,
factor: 1,
maxTimeout: this.config.waitingFundsTimeout,
minTimeout: this.config.waitingFundsTimeout,
}
)
}
}
5 changes: 5 additions & 0 deletions zp-relayer/lib/network/tron/TronTxManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { PreparedTx, SendAttempt, SendError, SendTx, TransactionManager, TxInfo } from '../types'
import BN from 'bn.js'

interface ExtraInfo {}

Expand Down Expand Up @@ -69,4 +70,8 @@ export class TronTxManager implements TransactionManager<ExtraInfo> {
const preparedTx = await this.prepareTx(sendTx)
return this.sendPreparedTx(preparedTx)
}

waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise<void> {
throw new Error('Method not implemented');
}
}
2 changes: 2 additions & 0 deletions zp-relayer/lib/network/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { TransactionConfig } from 'web3-core'
import type { EthereumContract } from './evm/EvmContract'
import type { TronContract } from './tron/TronContract'
import BN from 'bn.js'

export enum Network {
Tron = 'tron',
Expand Down Expand Up @@ -75,6 +76,7 @@ export interface TransactionManager<E> {
attempt?: SendAttempt<E>
error?: SendError
}>
waitingForFunds(minimumBalance: BN, cb: (balance: BN) => void): Promise<void>;
}

export interface INetworkContract {
Expand Down
22 changes: 8 additions & 14 deletions zp-relayer/pool/BasePool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ export abstract class BasePool<N extends Network = Network> {
abstract init(...args: any): Promise<void>

abstract onSend(p: ProcessResult<any>, txHash: string): Promise<void>
abstract onConfirmed(p: ProcessResult<any>, txHash: string, callback?: () => Promise<void>): Promise<void>
abstract onConfirmed(p: ProcessResult<any>, txHash: string, callback?: () => Promise<void>, jobId?: string): Promise<void>

async onFailed(txHash: string): Promise<void> {
async onFailed(txHash: string, jobId: string): Promise<void> {
logger.error('Transaction reverted', { txHash })

await this.clearOptimisticState()
Expand Down Expand Up @@ -143,7 +143,7 @@ export abstract class BasePool<N extends Network = Network> {
return lastBlockNumber
}

async syncState(startBlock?: number, indexerUrl?: string) {
async syncState(startBlock?: number, lastBlock?: number, indexerUrl?: string) {
logger.debug('Syncing state; starting from block %d', startBlock)

const localIndex = this.state.getNextIndex()
Expand All @@ -166,10 +166,10 @@ export abstract class BasePool<N extends Network = Network> {

if (indexerUrl) {
await this.syncStateFromIndexer(indexerUrl)
} else if (startBlock) {
await this.syncStateFromContract(startBlock, contractIndex, localIndex)
} else if (startBlock && lastBlock) {
await this.syncStateFromContract(startBlock, lastBlock, contractIndex, localIndex)
} else {
throw new Error('Either startBlock or indexerUrl should be provided for sync')
throw new Error('Either (startBlock, lastBlock) or indexerUrl should be provided for sync')
}

const newLocalIndex = this.state.getNextIndex()
Expand Down Expand Up @@ -217,23 +217,17 @@ export abstract class BasePool<N extends Network = Network> {
})
}

async syncStateFromContract(startBlock: number, contractIndex: number, localIndex: number) {
async syncStateFromContract(startBlock: number, lastBlock: number, contractIndex: number, localIndex: number) {
const numTxs = Math.floor((contractIndex - localIndex) / OUTPLUSONE)
if (numTxs < 0) {
// TODO: rollback state
throw new Error('State is corrupted, contract index is less than local index')
}

const missedIndices = Array(numTxs)
for (let i = 0; i < numTxs; i++) {
missedIndices[i] = localIndex + (i + 1) * OUTPLUSONE
}

const lastBlockNumber = (await this.getLastBlockToProcess()) + 1
for await (const batch of this.network.getEvents({
contract: this.network.pool,
startBlock,
lastBlock: lastBlockNumber,
lastBlock,
event: 'Message',
batchSize: this.config.eventsBatchSize,
})) {
Expand Down
3 changes: 2 additions & 1 deletion zp-relayer/pool/DefaultPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ export class DefaultPool extends BasePool {
}
await this.permitRecover?.initializeDomain()
if (startBlock) {
await this.syncState(startBlock)
const lastBlock = await this.getLastBlockToProcess()
await this.syncState(startBlock, lastBlock)
}
this.isInitialized = true
}
Expand Down
4 changes: 2 additions & 2 deletions zp-relayer/pool/FinalizerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class FinalizerPool extends BasePool {
this.denominator = toBN(await this.network.pool.call('denominator'))
this.poolId = toBN(await this.network.pool.call('pool_id'))

await this.syncState(undefined, indexerUrl)
await this.syncState(undefined, undefined, indexerUrl)

this.isInitialized = true
}
Expand All @@ -45,7 +45,7 @@ export class FinalizerPool extends BasePool {
async buildFinalizeTx({
transaction: { outCommit },
}: PoolTx<WorkerTxType.Finalize>): Promise<ProcessResult<FinalizerPool>> {
await this.syncState(undefined, this.indexerUrl)
await this.syncState(undefined, undefined, this.indexerUrl)

const func = 'proveTreeUpdate(uint256,uint256[8],uint256)'

Expand Down
6 changes: 3 additions & 3 deletions zp-relayer/pool/IndexerPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import { type PermitRecover } from '@/utils/permit/types'
export class IndexerPool extends BasePool {
public permitRecover: PermitRecover | null = null

async init(startBlock: number | null = null) {
async init(startBlock: number | null = null, lastBlock: number | null = null) {
if (this.isInitialized) return

this.denominator = toBN(await this.network.pool.call('denominator'))
this.poolId = toBN(await this.network.pool.call('pool_id'))

if (startBlock) {
await this.syncState(startBlock)
if (startBlock && lastBlock) {
await this.syncState(startBlock, lastBlock)
}
this.isInitialized = true
}
Expand Down
52 changes: 42 additions & 10 deletions zp-relayer/pool/RelayPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import config from '@/configs/relayerConfig'
import { logger } from '@/lib/appLogger'
import { Network } from '@/lib/network'
import { redis } from '@/lib/redisClient'
import { PoolTx, WorkerTxType } from '@/queue/poolTxQueue'
import { JobState, PoolTx, poolTxQueue, WorkerTxType } from '@/queue/poolTxQueue'
import { TxStore } from '@/state/TxStore'
import { ENERGY_SIZE, MOCK_CALLDATA, PERMIT2_CONTRACT, TOKEN_SIZE, TRANSFER_INDEX_SIZE } from '@/utils/constants'
import {
Expand Down Expand Up @@ -250,21 +250,53 @@ export class RelayPool extends BasePool<Network> {
}

async onSend({ outCommit, nullifier, memo, commitIndex }: ProcessResult<RelayPool>, txHash: string): Promise<void> {
const prefixedMemo = buildPrefixedMemo(
outCommit,
'0x0000000000000000000000000000000000000000000000000000000000000000',
memo
)

await this.txStore.add(commitIndex, prefixedMemo)

if (nullifier) {
logger.debug('Adding nullifier %s to OS', nullifier)
await this.optimisticState.nullifiers.add([nullifier])
}

await this.cacheTxLocally(commitIndex, outCommit, txHash, memo);
}

async onConfirmed(res: ProcessResult<RelayPool>, txHash: string, callback?: () => Promise<void>): Promise<void> {}
async onConfirmed(res: ProcessResult<RelayPool>, txHash: string, callback?: () => Promise<void>, jobId?: string): Promise<void> {
logger.debug("Updating pool job %s completed, txHash %s", jobId, txHash);
if (jobId) {
const poolJob = await poolTxQueue.getJob(jobId);
if (!poolJob) {
logger.error('Pool job not found', { jobId });
} else {
poolJob.data.transaction.state = JobState.COMPLETED;
poolJob.data.transaction.txHash = txHash;
await poolJob.update(poolJob.data);

await this.cacheTxLocally(res.commitIndex, res.outCommit, txHash, res.memo);
}
}
}

async onFailed(txHash: string, jobId: string): Promise<void> {
super.onFailed(txHash, jobId);
this.txStore.remove(jobId);
const poolJob = await poolTxQueue.getJob(jobId);
if (!poolJob) {
logger.error('Pool job not found', { jobId });
} else {
poolJob.data.transaction.state = JobState.REVERTED;
poolJob.data.transaction.txHash = txHash;
await poolJob.update(poolJob.data);
}
}

protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) {
// store or updating local tx store
// (we should keep sent transaction until the indexer grab them)
const prefixedMemo = buildPrefixedMemo(
commit,
txHash,
memo
);
await this.txStore.add(index, prefixedMemo);
}

async getIndexerInfo() {
const info = await fetchJson(this.indexerUrl, '/info', [])
Expand Down
4 changes: 2 additions & 2 deletions zp-relayer/services/commitment-watcher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import express from 'express'
import { init } from './init'
import { createRouter } from './router'

init().then(() => {
init().then((pool) => {
const app = express()

app.use(createRouter())
app.use(createRouter(pool))
const PORT = config.COMMITMENT_WATCHER_PORT
app.listen(PORT, () => logger.info(`Started commitment-watcher on port ${PORT}`))
})
2 changes: 2 additions & 0 deletions zp-relayer/services/commitment-watcher/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,6 @@ export async function init() {
workers.forEach(w => w.run())

runWatcher(pool)

return pool
}
9 changes: 7 additions & 2 deletions zp-relayer/services/commitment-watcher/router.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import config from '@/configs/commitmentWatcherConfig'
import { logger } from '@/lib/appLogger'
import { BasePool } from '@/pool/BasePool'
import { poolTxQueue, WorkerTx, WorkerTxType } from '@/queue/poolTxQueue'
import { applyDenominator } from '@/utils/helpers'
import { ValidationError } from '@/validation/api/validation'
import cors from 'cors'
import express, { NextFunction, Request, Response } from 'express'
import { toBN } from 'web3-utils'

export function createRouter() {
export function createRouter(pool: BasePool) {
const router = express.Router()

router.use(cors())
Expand All @@ -26,7 +29,9 @@ export function createRouter() {
})

router.get('/fee', (req, res) => {
res.json({ fee: config.COMMITMENT_WATCHER_FEE })
const dInverse = toBN(1).shln(255)
const fee = applyDenominator(config.COMMITMENT_WATCHER_FEE, pool.denominator.xor(dInverse))
res.json({ fee: fee.toString(10) })
})

router.get('/job/:commitment', async (req, res) => {
Expand Down
7 changes: 4 additions & 3 deletions zp-relayer/services/indexer/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ export async function init() {
eventsBatchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE,
})

await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK)])
const lastInitialSyncBlock = await pool.getLastBlockToProcess()
await Promise.all([networkBackend.init(), pool.init(config.base.COMMON_START_BLOCK, lastInitialSyncBlock)])

const startBlock = await pool.getLastBlockToProcess()
const startBlock = lastInitialSyncBlock + 1
const watcher = new Watcher(networkBackend, networkBackend.pool, 'pool-indexer', {
event: 'allEvents',
blockConfirmations: parseInt(process.env.INDEXER_BLOCK_CONFIRMATIONS || '1'),
blockConfirmations: config.INDEXER_BLOCK_CONFIRMATIONS,
startBlock,
eventPollingInterval: parseInt(process.env.WATCHER_EVENT_POLLING_INTERVAL || '10000'),
batchSize: config.base.COMMON_EVENTS_PROCESSING_BATCH_SIZE,
Expand Down
Loading

0 comments on commit b52416b

Please sign in to comment.