Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: deterministic tasking #85

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,10 @@ export {
HashMismatchError,
validateBlock
} from 'https://cdn.skypack.dev/@web3-storage/[email protected]/?dts'
// cdn.skypack.dev cannot resolve import from @noble/hashes
// jsdelivr.net seems to work better, it's also recommended by drand-client
export {
fetchBeaconByTime,
HttpChainClient,
HttpCachingChain
} from 'https://cdn.jsdelivr.net/npm/[email protected]/index.js/+esm'
36 changes: 36 additions & 0 deletions lib/drand-client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import {
fetchBeaconByTime,
HttpChainClient,
HttpCachingChain
} from '../vendor/deno-deps.js'

// See https://docs.filecoin.io/networks/mainnet#genesis
const FIL_MAINNET_GENESIS_TS = new Date('2020-08-24T22:00:00Z').getTime()
const FIL_MAINNET_BLOCK_TIME = 30_000 // 30 seconds

/** @type {import('https://cdn.skypack.dev/[email protected]/?dts').ChainOptions} */
const DRAND_OPTIONS = {
// FIXME: beacon verification does not work when using drand-client via CDN :(
// Without verification, we are blindly trusting https://api.drand.sh/ to provide honest responses.
// See https://github.com/filecoin-station/spark/issues/86
disableBeaconVerification: true,
noCache: false,
chainVerificationParams: {
// quicknet
chainHash: '52db9ba70e0cc0f6eaf7803dd07447a1f5477735fd3f661792ba94600c84e971',
publicKey: '83cf0f2896adee7eb8b5f01fcad3912212c437e0073e911fb90022d3e760183c8c4b450b6a0a6c3ac6a5776a2d1064510d1fec758c921cc22b0e17e63aaf4bcb5ed66304de9cf809bd274ca73bab4af5a6e9c76a4bc09e76eae8991ef5ece45a'
}
}

const DRAND_URL = `https://api.drand.sh/${DRAND_OPTIONS.chainVerificationParams.chainHash}`
const chain = new HttpCachingChain(DRAND_URL, DRAND_OPTIONS)
const client = new HttpChainClient(chain, DRAND_OPTIONS)

/**
* @param {number} roundStartEpoch
*/
export async function getRandomnessForSparkRound (roundStartEpoch) {
const roundStartedAt = roundStartEpoch * FIL_MAINNET_BLOCK_TIME + FIL_MAINNET_GENESIS_TS
const beacon = await fetchBeaconByTime(client, roundStartedAt)
return beacon.randomness
}
44 changes: 44 additions & 0 deletions lib/http-assertions.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { AssertionError } from 'zinnia:assert'

/**
* @param {Response} res
* @param {string} [errorMsg]
*/
export async function assertOkResponse (res, errorMsg) {
if (res.ok) return

let body
try {
body = await res.text()
} catch {}
const err = new Error(`${errorMsg ?? 'Fetch failed'} (${res.status}): ${body?.trimEnd()}`)
err.statusCode = res.status
err.serverMessage = body
throw err
}

/**
* @param {Response} res
* @param {string} [errorMsg]
*/
export async function assertRedirectResponse (res, errorMsg) {
if ([301, 302, 303, 304, 307, 308].includes(res.status)) {
const location = res.headers.get('location')
if (!location) {
const msg = (errorMsg ? errorMsg + ' ' : '') +
'The server response is missing the Location header. Headers found:\n' +
Array.from(res.headers.keys()).join('\n')
throw new AssertionError(msg)
}
return
}

let body
try {
body = await res.text()
} catch {}
const err = new Error(`${errorMsg ?? 'Server did not respond with redirect'} (${res.status}): ${body?.trimEnd()}`)
err.statusCode = res.status
err.serverMessage = body
throw err
}
49 changes: 17 additions & 32 deletions lib/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import { ActivityState } from './activity-state.js'
import { SPARK_VERSION, MAX_CAR_SIZE, APPROX_ROUND_LENGTH_IN_MS } from './constants.js'
import { queryTheIndex } from './ipni-client.js'
import { assertOkResponse } from './http-assertions.js'
import { getMinerPeerId as defaultGetMinerPeerId } from './miner-info.js'
import { multiaddrToHttpUrl } from './multiaddr.js'
import { Tasker } from './tasker.js'

import {
CarBlockIterator,
Expand All @@ -20,33 +22,25 @@ export default class Spark {
#fetch
#getMinerPeerId
#activity = new ActivityState()
#maxTasksPerNode = 360
#tasker

constructor ({
fetch = globalThis.fetch,
getMinerPeerId = defaultGetMinerPeerId
} = {}) {
this.#fetch = fetch
this.#getMinerPeerId = getMinerPeerId
this.#tasker = new Tasker({
fetch: this.#fetch,
activityState: this.#activity
})
}

async getRetrieval () {
console.log('Getting current SPARK round details...')
const res = await this.#fetch('https://api.filspark.com/rounds/current', {
method: 'GET',
headers: { 'Content-Type': 'application/json' },
signal: AbortSignal.timeout(10_000)
})
await assertOkResponse(res, 'Failed to fetch the current SPARK round')
this.#activity.onHealthy()
const { retrievalTasks, maxTasksPerNode, ...round } = await res.json()
console.log('Current SPARK round:', round)
console.log(' %s max tasks per node', maxTasksPerNode ?? '<n/a>')
console.log(' %s retrieval tasks', retrievalTasks.length)
if (maxTasksPerNode) this.#maxTasksPerNode = maxTasksPerNode

const retrieval = retrievalTasks[Math.floor(Math.random() * retrievalTasks.length)]
console.log({ retrieval })
const retrieval = await this.#tasker.next()
if (retrieval) {
console.log({ retrieval })
}
return retrieval
}

Expand Down Expand Up @@ -190,7 +184,11 @@ export default class Spark {
}

async nextRetrieval () {
const { id: retrievalId, ...retrieval } = await this.getRetrieval()
const retrieval = await this.getRetrieval()
if (!retrieval) {
console.log('Completed all tasks for the current round. Waiting for the next round to start.')
return
}

const stats = newStats()

Expand All @@ -211,7 +209,7 @@ export default class Spark {
this.handleRunError(err)
}
const duration = Date.now() - started
const baseDelay = APPROX_ROUND_LENGTH_IN_MS / this.#maxTasksPerNode
const baseDelay = APPROX_ROUND_LENGTH_IN_MS / this.#tasker.maxTasksPerRound
const delay = baseDelay - duration
if (delay > 0) {
console.log('Sleeping for %s seconds before starting the next task...', Math.round(delay / 1000))
Expand Down Expand Up @@ -320,16 +318,3 @@ function mapErrorToStatusCode (err) {
// Fallback code for unknown errors
return 600
}

async function assertOkResponse (res, errorMsg) {
if (res.ok) return

let body
try {
body = await res.text()
} catch {}
const err = new Error(`${errorMsg ?? 'Fetch failed'} (${res.status}): ${body.trimEnd()}`)
err.statusCode = res.status
err.serverMessage = body
throw err
}
154 changes: 154 additions & 0 deletions lib/tasker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/* global Zinnia */

import { ActivityState } from './activity-state.js'
import { encodeHex } from '../vendor/deno-deps.js'
import { assertOkResponse, assertRedirectResponse } from './http-assertions.js'
import { getRandomnessForSparkRound } from './drand-client.js'
import { assertEquals, assertInstanceOf } from 'zinnia:assert'

/** @typedef {{cid: string; minerId: string;}} RetrievalTask */
/** @typedef {RetrievalTask & { key: string}} KeyedRetrievalTask */

export class Tasker {
#lastRoundUrl
/** @type {Task[]} */
#remainingRoundTasks
#fetch
#activity

/**
* @param {object} args
* @param {globalThis.fetch} args.fetch
* @param {ActivityState} args.activityState
*/
constructor ({
fetch = globalThis.fetch,
activityState = new ActivityState()
} = {}) {
this.#fetch = fetch
this.#activity = activityState

this.maxTasksPerRound = 360

// TODO: persist these two values across module restarts
// Without persistence, after the Spark module is restarted, it will start executing the same
// retrieval tasks we have already executed
this.#lastRoundUrl = 'unknown'
this.#remainingRoundTasks = []
}

/**
* @returns {Task | undefined}
*/
async next () {
await this.#updateCurrentRound()
return this.#remainingRoundTasks.pop()
}

async #updateCurrentRound () {
console.log('Checking the current SPARK round...')
let res = await this.#fetch('https://api.filspark.com/rounds/current', {
method: 'GET',
headers: { 'Content-Type': 'application/json' },
redirect: 'manual',
signal: AbortSignal.timeout(10_000)
})

await assertRedirectResponse(res, 'Failed to find the URL of the current SPARK round')
const roundUrl = res.headers.get('location')
this.#activity.onHealthy()
if (roundUrl === this.#lastRoundUrl) {
console.log('Round did not change since the last iteration')
return
}

console.log('Fetching round details at location %s', roundUrl)
res = await this.#fetch(`https://api.filspark.com${roundUrl}`, {
method: 'GET',
headers: { 'Content-Type': 'application/json' },
signal: AbortSignal.timeout(10_000)
})
await assertOkResponse(res, 'Failed to fetch the current SPARK round')
const { retrievalTasks, maxTasksPerNode, ...round } = await res.json()
console.log('Current SPARK round:', round)
console.log(' %s max tasks per round', maxTasksPerNode ?? '<n/a>')
console.log(' %s retrieval tasks', retrievalTasks.length)
this.maxTasksPerRound = maxTasksPerNode

const randomness = await getRandomnessForSparkRound(round.startEpoch)
console.log(' randomness: %s', randomness)

this.#remainingRoundTasks = await pickTasksForNode({
tasks: retrievalTasks,
maxTasksPerRound: this.maxTasksPerRound,
randomness,
stationId: Zinnia.stationId
})

this.#lastRoundUrl = roundUrl
}
}

const textEncoder = new TextEncoder()

/**
* @param {Task} task
* @param {string} randomness
* @returns
*/
export async function getTaskKey (task, randomness) {
assertEquals(typeof task, 'object', 'task must be an object')
assertEquals(typeof task.cid, 'string', 'task.cid must be a string')
assertEquals(typeof task.minerId, 'string', 'task.minerId must be a string')
assertEquals(typeof randomness, 'string', 'randomness must be a string')

const data = [task.cid, task.minerId, randomness].join('\n')
const hash = await crypto.subtle.digest('sha-256', textEncoder.encode(data))
return BigInt('0x' + encodeHex(hash))
}

/**
* @param {string} stationId
*/
export async function getStationKey (stationId) {
assertEquals(typeof stationId, 'string', 'stationId must be a string')

const hash = await crypto.subtle.digest('sha-256', textEncoder.encode(stationId))
return BigInt('0x' + encodeHex(hash))
}

/**
* @param {object} args
* @param {Task[]} args.tasks
* @param {string} args.stationId
* @param {string} args.randomness
* @param {number} args.maxTasksPerRound
* @returns {Promise<Task[]>}
*/
export async function pickTasksForNode ({ tasks, stationId, randomness, maxTasksPerRound }) {
assertInstanceOf(tasks, Array, 'tasks must be an array')
assertEquals(typeof stationId, 'string', 'stationId must be a string')
assertEquals(typeof randomness, 'string', 'randomness must be a string')
assertEquals(typeof maxTasksPerRound, 'number', 'maxTasksPerRound must be a number')

const keyedTasks = await Promise.all(tasks.map(
async (t) => ({ ...t, key: await getTaskKey(t, randomness) })
))
const stationKey = await getStationKey(stationId)

/**
* @param {{key: bigint}} a
* @param {{key: bigint}} b
* @returns {number}
*/
const comparator = (a, b) => {
const ad = a.key ^ stationKey
const bd = b.key ^ stationKey
return ad > bd ? 1 : ad < bd ? -1 : 0
}

keyedTasks.sort(comparator)
keyedTasks.splice(maxTasksPerRound)

return keyedTasks.map(({ key, ...t }) => (t))
}
Comment on lines +99 to +154
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will need to stay in sync with the fraud-detection code in spark-evaluate. At the same time, I don't expect it to change often. When we do change it, we need to carefully design a migration/deployment path how to update both spark-evaluate and the checker network at the same time. Considering that, I think it's ok to keep this code duplicated between the two GitHub repos (spark and spark-evaluate).

Also in spark-evaluate, I'll need to use a more efficient version using k-closest module instead of Array.prototype.sort. If we wanted to share that code, I would need to place it into a standalone npm package or else inline k-closest module into our codebase, otherwise we won't be able to vendor it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in spark-evaluate, I'll need to use a more efficient version using k-closest module instead of Array.prototype.sort

Does this method produce different results?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It must produce the same results, otherwise the checkers will pick tasks that will not be accepted by spark-evaluate's fraud detection.

I verified this in my PoC by running Array.prototype.sort alongside k-closest and comparing the results. See here:
https://github.com/filecoin-station/spark-evaluate/pull/287/files#diff-0f9e17445925f01057ff062d7d52d2862b081121d6e722cdeda8a2b1ad106ebcR275-R283

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, and have you checked if Array#sort is too slow for spark-evaluate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can get dependabot to work for this repo, then I think it would be nicest to share this code in a dependency. If not, inlining like this should be the simplest

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, and have you checked if Array#sort is too slow for spark-evaluate?

Yes, of course. I don't remember the exact number, though. It was an order of magnitude slower.

If we can get dependabot to work for this repo, then I think it would be nicest to share this code in a dependency. If not, inlining like this should be the simplest

I don't see any easy way how to enable dependabot here, let's keep this inlined for now.

If not, can we add comments to spark-evaluate to also update this?

Sure. Would you like me to add a code comment to this file too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, of course. I don't remember the exact number, though. It was an order of magnitude slower.

Order of magnitude slower doesn't mean too slow though. Eg if it's 1ms vs 10ms or even 100ms it's probably still not be worth the extra complexity.

Sure. Would you like me to add a code comment to this file too?

I don't think it's necessary since we're going to evolve the algorithm from spark-evaluate and not this repo

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The k-closest version takes 3-4 seconds to finish. The sort-based version took 1-2 minutes IIRC.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-ran the PoC using the Array sort() version. It takes 27.38 seconds to build the list of valid tasks.

Evaluating round 10788n of contract 0x8460766edc62b525fc1fa4d628fc79229dc73031
(...)
EVALUATE ROUND 10788n: using randomness f427ee008087a37142c9c22f6b227a1f915a495648bf3a3494c555c097f0880c
EVALUATE ROUND 10788n: built per-node task lists in 27380ms [Tasks=1000;TN=15;Nodes=61095]

The k-closest version takes 4 seconds to complete.

EVALUATE ROUND 10788n: using randomness f427ee008087a37142c9c22f6b227a1f915a495648bf3a3494c555c097f0880c
EVALUATE ROUND 10788n: built per-node task lists in 4040ms [Tasks=1000;TN=15;Nodes=61095]

For perspective: using the current spark-evaluate main, the entire dry-run evaluation of a single round takes 4.86 seconds.

I thought that increasing the time to evaluate a round from 5 seconds to 33 seconds is not acceptable. However, if you think it's fine, then I am happy to use the Array sort() version in spark-evaluate too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking the event loop for 33 seconds is super long, and the service will be unresponsive and not receive events. I agree k-closest is worth it here :)

8 changes: 8 additions & 0 deletions test/drand-client.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { test } from 'zinnia:test'
import { assertEquals } from 'zinnia:assert'
import { getRandomnessForSparkRound } from '../lib/drand-client.js'

test('getRandomnessForSparkRound', async () => {
const randomness = await getRandomnessForSparkRound(4111111)
assertEquals(randomness, 'fc90e50dcdf20886b56c038b30fa921a5e57c532ea448dadcc209e44eec0445e')
})
Loading