From cc1389d5d5538583c8cfa0262c75e437f9d0cea1 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Tue, 14 May 2024 15:18:02 +0200 Subject: [PATCH 1/3] move rewards update loop out of zinnia loop --- commands/station.js | 45 +++++++++++++- lib/contracts.js | 51 ++++++++++++++++ lib/obj.js | 13 ++++ lib/rewards.js | 31 ++++++++++ lib/zinnia.js | 144 ++++++-------------------------------------- 5 files changed, 156 insertions(+), 128 deletions(-) create mode 100644 lib/contracts.js create mode 100644 lib/obj.js create mode 100644 lib/rewards.js diff --git a/commands/station.js b/commands/station.js index a2f973d1..1206bb04 100644 --- a/commands/station.js +++ b/commands/station.js @@ -9,7 +9,11 @@ import { getStationId } from '../lib/station-id.js' import pRetry from 'p-retry' import { fetch } from 'undici' import { ethAddressFromDelegated } from '@glif/filecoin-address' -import { formatEther } from 'ethers' +import { ethers, formatEther } from 'ethers' +import { Obj } from '../lib/obj.js' +import { runUpdateRewardsLoop } from '../lib/rewards.js' +import { runUpdateContractsLoop } from '../lib/contracts.js' +import { fileURLToPath } from 'node:url' const { FIL_WALLET_ADDRESS, @@ -94,8 +98,33 @@ export const station = async ({ json, experimental }) => { console.error('No experimental modules available at this point') } + const lastTotalJobsCompleted = new Obj(0) + const lastRewardsScheduledForAddress = new Obj() + const contracts = new Obj() + + const fetchRequest = new ethers.FetchRequest( + 'https://api.node.glif.io/rpc/v1' + ) + fetchRequest.setHeader( + 'Authorization', + 'Bearer RXQ2SKH/BVuwN7wisZh3b5uXStGPj1JQIrIWD+rxF0Y=' + ) + const provider = new ethers.JsonRpcProvider( + fetchRequest, + null, + { batchMaxCount: 1 } + ) + const abi = JSON.parse( + await fs.readFile( + fileURLToPath(new URL('../lib/abi.json', import.meta.url)), + 'utf8' + ) + ) + await Promise.all([ zinniaRuntime.run({ + provider, + abi, STATION_ID, FIL_WALLET_ADDRESS: ethAddress, ethAddress, @@ -112,9 +141,19 @@ export const station = async ({ json, experimental }) => { source: activity.source || 'Zinnia' }) }, - onMetrics: m => metrics.submit('zinnia', m) + onMetrics: m => metrics.submit('zinnia', m), + lastTotalJobsCompleted, + lastRewardsScheduledForAddress }), runPingLoop({ STATION_ID }), - runMachinesLoop({ STATION_ID }) + runMachinesLoop({ STATION_ID }), + runUpdateContractsLoop({ provider, abi, contracts }), + runUpdateRewardsLoop({ + contracts, + ethAddress, + onMetrics: m => metrics.submit('zinnia', m), + lastTotalJobsCompleted, + lastRewardsScheduledForAddress + }) ]) } diff --git a/lib/contracts.js b/lib/contracts.js new file mode 100644 index 00000000..811a0c65 --- /dev/null +++ b/lib/contracts.js @@ -0,0 +1,51 @@ +import timers from 'node:timers/promises' +import pRetry from 'p-retry' +import * as Name from 'w3name' +import { ethers } from 'ethers' + +const { + // https://github.com/filecoin-station/contract-addresses + CONTRACT_ADDRESSES_IPNS_KEY = 'k51qzi5uqu5dmaqrefqazad0ca8b24fb79zlacfjw2awdt5gjf2cr6jto5jyqe' +} = process.env + +export const runUpdateContractsLoop = async ({ provider, abi, contracts }) => { + while (true) { + const delay = 10 * 60 * 1000 // 10 minutes + const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds + try { + await timers.setTimeout(delay + jitter) + } catch (err) { + if (err.name === 'AbortError') return + throw err + } + contracts.set(await getContractsWithRetry({ provider, abi })) + } +} + +async function getContractsWithRetry ({ provider, abi }) { + const contractAddresses = await pRetry(getContractAddresses, { + retries: 10, + onFailedAttempt: err => { + console.error(err) + console.error('Failed to get contract addresses. Retrying...') + if (String(err).includes('You are being rate limited')) { + const delaySeconds = 60 + (Math.random() * 60) + // Don't DDOS the w3name services + console.error( + `Rate limited. Waiting ${delaySeconds} seconds...` + ) + return timers.setTimeout(delaySeconds * 1000) + } + } + }) + console.error(`Meridian contract addresses: ${contractAddresses.join(', ')}`) + return contractAddresses.map(address => { + return new ethers.Contract(address, abi, provider) + }) +} + +async function getContractAddresses () { + const name = Name.parse(CONTRACT_ADDRESSES_IPNS_KEY) + const revision = await Name.resolve(name) + return revision.value.split('\n').filter(Boolean) +} diff --git a/lib/obj.js b/lib/obj.js new file mode 100644 index 00000000..f37c5679 --- /dev/null +++ b/lib/obj.js @@ -0,0 +1,13 @@ +export class Obj { + constructor (value = null) { + this._value = value + } + + set (val) { + this._value = val + } + + get () { + return this._value + } +} diff --git a/lib/rewards.js b/lib/rewards.js new file mode 100644 index 00000000..288d2911 --- /dev/null +++ b/lib/rewards.js @@ -0,0 +1,31 @@ +import timers from 'node:timers/promises' + +export const runUpdateRewardsLoop = async ({ contracts, ethAddress, onMetrics, lastTotalJobsCompleted, lastRewardsScheduledForAddress }) => { + while (true) { + while (!contracts.get()) { + await timers.setTimeout(1000) + } + const contractRewards = await Promise.all(contracts.get().map(async contract => { + return getScheduledRewardsWithFallback(contract, ethAddress) + })) + const totalRewards = contractRewards.reduce((a, b) => a + b, 0n) + onMetrics({ + totalJobsCompleted: lastTotalJobsCompleted.get(), + rewardsScheduledForAddress: totalRewards + }) + lastRewardsScheduledForAddress.set(totalRewards) + + const delay = 10 * 60 * 1000 // 10 minutes + const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds + await timers.setTimeout(delay + jitter) + } +} + +async function getScheduledRewardsWithFallback (contract, ethAddress) { + try { + return await contract.rewardsScheduledFor(ethAddress) + } catch (err) { + console.error('Failed to get scheduled rewards:', err.stack) + return 0n + } +} diff --git a/lib/zinnia.js b/lib/zinnia.js index 7ac832a3..8220c173 100644 --- a/lib/zinnia.js +++ b/lib/zinnia.js @@ -2,13 +2,9 @@ import { execa } from 'execa' import Sentry from '@sentry/node' import { installBinaryModule, updateSourceFiles, getBinaryModuleExecutable } from './modules.js' import os from 'node:os' -import { ethers } from 'ethers' -import fs from 'node:fs/promises' -import { fileURLToPath } from 'node:url' import pRetry from 'p-retry' import timers from 'node:timers/promises' import { join } from 'node:path' -import * as Name from 'w3name' const ZINNIA_DIST_TAG = 'v0.19.1' const ZINNIA_MODULES = [ @@ -22,9 +18,7 @@ const ZINNIA_MODULES = [ ] const { TARGET_ARCH = os.arch(), - MODULE_FILTER = '', - // https://github.com/filecoin-station/contract-addresses - CONTRACT_ADDRESSES_IPNS_KEY = 'k51qzi5uqu5dmaqrefqazad0ca8b24fb79zlacfjw2awdt5gjf2cr6jto5jyqe' + MODULE_FILTER = '' } = process.env export const install = () => installBinaryModule({ @@ -72,9 +66,6 @@ const maybeReportErrorToSentry = (/** @type {unknown} */ err) => { Sentry.captureException(err, hint) } -let lastTotalJobsCompleted = 0 -let lastRewardsScheduledForAddress = 0n - const matchesModuleFilter = module => MODULE_FILTER === '' || module === MODULE_FILTER @@ -120,35 +111,6 @@ const updateAllSourceFiles = async ({ return hasUpdated } -async function getContractAddresses () { - const name = Name.parse(CONTRACT_ADDRESSES_IPNS_KEY) - const revision = await Name.resolve(name) - return revision.value.split('\n').filter(Boolean) -} - -async function getContractsWithRetry ({ provider, abi, signal }) { - const contractAddresses = await pRetry(getContractAddresses, { - signal, - retries: 10, - onFailedAttempt: err => { - console.error(err) - console.error('Failed to get contract addresses. Retrying...') - if (String(err).includes('You are being rate limited')) { - const delaySeconds = 60 + (Math.random() * 60) - // Don't DDOS the w3name services - console.error( - `Rate limited. Waiting ${delaySeconds} seconds...` - ) - return timers.setTimeout(delaySeconds * 1000) - } - } - }) - console.error(`Meridian contract addresses: ${contractAddresses.join(', ')}`) - return contractAddresses.map(address => { - return new ethers.Contract(address, abi, provider) - }) -} - const runUpdateSourceFilesLoop = async ({ controller, signal, @@ -193,51 +155,6 @@ const runUpdateSourceFilesLoop = async ({ } } -const runUpdateContractsLoop = async ({ signal, provider, abi, contracts }) => { - while (true) { - const delay = 10 * 60 * 1000 // 10 minutes - const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds - try { - await timers.setTimeout(delay + jitter, null, { signal }) - } catch (err) { - if (err.name === 'AbortError') return - throw err - } - const newContracts = await getContractsWithRetry({ provider, abi, signal }) - contracts.splice(0) - contracts.push(...newContracts) - if (signal.aborted) { - return - } - } -} - -const runUpdateRewardsLoop = async ({ signal, contracts, ethAddress, onMetrics }) => { - while (true) { - const contractRewards = await Promise.all(contracts.map(async contract => { - return getScheduledRewardsWithFallback(contract, ethAddress) - })) - if (signal.aborted) { - return - } - const totalRewards = contractRewards.reduce((a, b) => a + b, 0n) - onMetrics({ - totalJobsCompleted: lastTotalJobsCompleted, - rewardsScheduledForAddress: totalRewards - }) - lastRewardsScheduledForAddress = totalRewards - - const delay = 10 * 60 * 1000 // 10 minutes - const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds - try { - await timers.setTimeout(delay + jitter, null, { signal }) - } catch (err) { - if (err.name === 'AbortError') return - throw err - } - } -} - const catchChildProcessExit = async ({ childProcesses, controller, @@ -288,6 +205,8 @@ const catchChildProcessExit = async ({ } export async function run ({ + provider, + abi, STATION_ID, FIL_WALLET_ADDRESS, ethAddress, @@ -297,28 +216,10 @@ export async function run ({ moduleSourcesDir, onActivity, onMetrics, - isUpdated = false + isUpdated = false, + lastTotalJobsCompleted, + lastRewardsScheduledForAddress }) { - const fetchRequest = new ethers.FetchRequest( - 'https://api.node.glif.io/rpc/v1' - ) - fetchRequest.setHeader( - 'Authorization', - 'Bearer RXQ2SKH/BVuwN7wisZh3b5uXStGPj1JQIrIWD+rxF0Y=' - ) - const provider = new ethers.JsonRpcProvider( - fetchRequest, - null, - { batchMaxCount: 1 } - ) - const abi = JSON.parse( - await fs.readFile( - fileURLToPath(new URL('./abi.json', import.meta.url)), - 'utf8' - ) - ) - - const contracts = await getContractsWithRetry({ provider, abi, signal: null }) const zinniadExe = getBinaryModuleExecutable({ module: 'zinnia', executable: 'zinniad' }) if (!isUpdated) { @@ -388,11 +289,11 @@ export async function run ({ resetTimeout() handleEvents({ module, - contracts, - ethAddress, onActivity, onMetrics, - text: data + text: data, + lastTotalJobsCompleted, + lastRewardsScheduledForAddress }).catch(err => { console.error(err) Sentry.captureException(err) @@ -420,8 +321,6 @@ export async function run ({ moduleVersionsDir, moduleSourcesDir }), - runUpdateContractsLoop({ signal, provider, abi, contracts }), - runUpdateRewardsLoop({ signal, contracts, ethAddress, onMetrics }), catchChildProcessExit({ childProcesses, onActivity, controller }) ]) console.error('Zinnia main loop ended') @@ -435,6 +334,8 @@ export async function run ({ // This infinite recursion has no risk of exceeding the maximum call stack // size, as awaiting promises unwinds the stack return run({ + provider, + abi, STATION_ID, FIL_WALLET_ADDRESS, ethAddress, @@ -444,7 +345,9 @@ export async function run ({ moduleSourcesDir, onActivity, onMetrics, - isUpdated: true + isUpdated: true, + lastTotalJobsCompleted, + lastRewardsScheduledForAddress }) } @@ -452,11 +355,11 @@ const jobsCompleted = {} async function handleEvents ({ module, - contracts, - ethAddress, onActivity, onMetrics, - text + text, + lastTotalJobsCompleted, + lastRewardsScheduledForAddress }) { for (const line of text.trimEnd().split(/\n/g)) { try { @@ -492,9 +395,9 @@ async function handleEvents ({ const totalJobsCompleted = Object.values(jobsCompleted).reduce((a, b) => a + b, 0) onMetrics({ totalJobsCompleted, - rewardsScheduledForAddress: lastRewardsScheduledForAddress + rewardsScheduledForAddress: lastRewardsScheduledForAddress.get() }) - lastTotalJobsCompleted = totalJobsCompleted + lastTotalJobsCompleted.set(totalJobsCompleted) break } @@ -506,12 +409,3 @@ async function handleEvents ({ } } } - -async function getScheduledRewardsWithFallback (contract, ethAddress) { - try { - return await contract.rewardsScheduledFor(ethAddress) - } catch (err) { - console.error('Failed to get scheduled rewards:', err.stack) - return 0n - } -} From ac445c0e42911aa9253f4ddbad8336e4dc2f1fa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 14 May 2024 16:30:01 +0200 Subject: [PATCH 2/3] fix: initialise `lastRewardsScheduledForAddress` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- commands/station.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/station.js b/commands/station.js index 1206bb04..2129c572 100644 --- a/commands/station.js +++ b/commands/station.js @@ -99,7 +99,7 @@ export const station = async ({ json, experimental }) => { } const lastTotalJobsCompleted = new Obj(0) - const lastRewardsScheduledForAddress = new Obj() + const lastRewardsScheduledForAddress = new Obj(0n) const contracts = new Obj() const fetchRequest = new ethers.FetchRequest( From 1672f281ba479de953aa17ba98115baadfdc235c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20Bajto=C5=A1?= Date: Tue, 14 May 2024 16:30:22 +0200 Subject: [PATCH 3/3] fix: print full details for event handler errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miroslav Bajtoš --- lib/zinnia.js | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/zinnia.js b/lib/zinnia.js index 8220c173..1e53a1b5 100644 --- a/lib/zinnia.js +++ b/lib/zinnia.js @@ -362,8 +362,14 @@ async function handleEvents ({ lastRewardsScheduledForAddress }) { for (const line of text.trimEnd().split(/\n/g)) { + let event + try { + event = JSON.parse(line) + } catch (err) { + console.error('Ignoring malformed Zinnia event:', line) + } + try { - const event = JSON.parse(line) switch (event.type) { case 'activity:started': onActivity({ @@ -405,7 +411,8 @@ async function handleEvents ({ console.error('Ignoring Zinnia event of unknown type:', event) } } catch (err) { - console.error('Ignoring malformed Zinnia event:', line) + console.error('Cannot handle Zinnia event: %s', line) + console.error(err) } } }