Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move rewards update loop out of zinnia loop #462

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions commands/station.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import { getStationId } from '../lib/station-id.js'
import pRetry from 'p-retry'
import { fetch } from 'undici'
import { ethAddressFromDelegated } from '@glif/filecoin-address'
import { formatEther } from 'ethers'
import { ethers, formatEther } from 'ethers'
import { Obj } from '../lib/obj.js'
import { runUpdateRewardsLoop } from '../lib/rewards.js'
import { runUpdateContractsLoop } from '../lib/contracts.js'
import { fileURLToPath } from 'node:url'

const {
FIL_WALLET_ADDRESS,
Expand Down Expand Up @@ -94,8 +98,33 @@ export const station = async ({ json, experimental }) => {
console.error('No experimental modules available at this point')
}

const lastTotalJobsCompleted = new Obj(0)
const lastRewardsScheduledForAddress = new Obj(0n)
const contracts = new Obj()

const fetchRequest = new ethers.FetchRequest(
'https://api.node.glif.io/rpc/v1'
)
fetchRequest.setHeader(
'Authorization',
'Bearer RXQ2SKH/BVuwN7wisZh3b5uXStGPj1JQIrIWD+rxF0Y='
)
const provider = new ethers.JsonRpcProvider(
fetchRequest,
null,
{ batchMaxCount: 1 }
)
const abi = JSON.parse(
await fs.readFile(
fileURLToPath(new URL('../lib/abi.json', import.meta.url)),
'utf8'
)
)

await Promise.all([
zinniaRuntime.run({
provider,
abi,
STATION_ID,
FIL_WALLET_ADDRESS: ethAddress,
ethAddress,
Expand All @@ -112,9 +141,19 @@ export const station = async ({ json, experimental }) => {
source: activity.source || 'Zinnia'
})
},
onMetrics: m => metrics.submit('zinnia', m)
onMetrics: m => metrics.submit('zinnia', m),
lastTotalJobsCompleted,
lastRewardsScheduledForAddress
}),
runPingLoop({ STATION_ID }),
runMachinesLoop({ STATION_ID })
runMachinesLoop({ STATION_ID }),
runUpdateContractsLoop({ provider, abi, contracts }),
runUpdateRewardsLoop({
contracts,
ethAddress,
onMetrics: m => metrics.submit('zinnia', m),
lastTotalJobsCompleted,
lastRewardsScheduledForAddress
})
])
}
51 changes: 51 additions & 0 deletions lib/contracts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import timers from 'node:timers/promises'
import pRetry from 'p-retry'
import * as Name from 'w3name'
import { ethers } from 'ethers'

const {
// https://github.com/filecoin-station/contract-addresses
CONTRACT_ADDRESSES_IPNS_KEY = 'k51qzi5uqu5dmaqrefqazad0ca8b24fb79zlacfjw2awdt5gjf2cr6jto5jyqe'
} = process.env

export const runUpdateContractsLoop = async ({ provider, abi, contracts }) => {
while (true) {
const delay = 10 * 60 * 1000 // 10 minutes
const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds
try {
await timers.setTimeout(delay + jitter)
} catch (err) {
if (err.name === 'AbortError') return
throw err
}
contracts.set(await getContractsWithRetry({ provider, abi }))
}
}

async function getContractsWithRetry ({ provider, abi }) {
const contractAddresses = await pRetry(getContractAddresses, {
retries: 10,
onFailedAttempt: err => {
console.error(err)
console.error('Failed to get contract addresses. Retrying...')
if (String(err).includes('You are being rate limited')) {
const delaySeconds = 60 + (Math.random() * 60)
// Don't DDOS the w3name services
console.error(
`Rate limited. Waiting ${delaySeconds} seconds...`
)
return timers.setTimeout(delaySeconds * 1000)
}
}
})
console.error(`Meridian contract addresses: ${contractAddresses.join(', ')}`)
return contractAddresses.map(address => {
return new ethers.Contract(address, abi, provider)
})
}

async function getContractAddresses () {
const name = Name.parse(CONTRACT_ADDRESSES_IPNS_KEY)
const revision = await Name.resolve(name)
return revision.value.split('\n').filter(Boolean)
}
13 changes: 13 additions & 0 deletions lib/obj.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export class Obj {
constructor (value = null) {
this._value = value
}

set (val) {
this._value = val
}

get () {
return this._value
}
}
31 changes: 31 additions & 0 deletions lib/rewards.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import timers from 'node:timers/promises'

export const runUpdateRewardsLoop = async ({ contracts, ethAddress, onMetrics, lastTotalJobsCompleted, lastRewardsScheduledForAddress }) => {
while (true) {
while (!contracts.get()) {
await timers.setTimeout(1000)
}
const contractRewards = await Promise.all(contracts.get().map(async contract => {
return getScheduledRewardsWithFallback(contract, ethAddress)
}))
const totalRewards = contractRewards.reduce((a, b) => a + b, 0n)
onMetrics({
totalJobsCompleted: lastTotalJobsCompleted.get(),
rewardsScheduledForAddress: totalRewards
})
lastRewardsScheduledForAddress.set(totalRewards)

const delay = 10 * 60 * 1000 // 10 minutes
const jitter = Math.random() * 20_000 - 10_000 // +- 10 seconds
await timers.setTimeout(delay + jitter)
}
}

async function getScheduledRewardsWithFallback (contract, ethAddress) {
try {
return await contract.rewardsScheduledFor(ethAddress)
} catch (err) {
console.error('Failed to get scheduled rewards:', err.stack)
return 0n
}
}
Loading