Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add reverted field to database and expose telemetrics for reverted events #97

Merged
merged 14 commits into from
Feb 20, 2025
4 changes: 3 additions & 1 deletion backend/bin/deal-observer-backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import '../lib/instrument.js'
import { createInflux } from '../lib/telemetry.js'
import { rpcRequest } from '../lib/rpc-service/service.js'
import { fetchDealWithHighestActivatedEpoch, countStoredActiveDeals, observeBuiltinActorEvents } from '../lib/deal-observer.js'
import { countStoredActiveDealsWithUnresolvedPayloadCid, lookUpPayloadCids } from '../lib/look-up-payload-cids.js'
import { countRevertedActiveDeals, countStoredActiveDealsWithUnresolvedPayloadCid, lookUpPayloadCids } from '../lib/look-up-payload-cids.js'
import { findAndSubmitUnsubmittedDeals, submitDealsToSparkApi } from '../lib/spark-api-submit-deals.js'
import { getDealPayloadCid } from '../lib/piece-indexer-service.js'
/** @import {Queryable} from '@filecoin-station/deal-observer-db' */
Expand Down Expand Up @@ -46,10 +46,12 @@ const observeActorEventsLoop = async (makeRpcRequest, pgPool) => {
await observeBuiltinActorEvents(pgPool, makeRpcRequest, maxPastEpochs, finalityEpochs)
const newLastInsertedDeal = await fetchDealWithHighestActivatedEpoch(pgPool)
const numberOfStoredDeals = await countStoredActiveDeals(pgPool)
const numberOfRevertedActiveDeals = await countRevertedActiveDeals(pgPool)
if (INFLUXDB_TOKEN) {
recordTelemetry('observed_deals_stats', point => {
point.intField('last_searched_epoch', newLastInsertedDeal.activated_at_epoch)
point.intField('number_of_stored_active_deals', numberOfStoredDeals)
point.intField('number_of_reverted_active_deals', numberOfRevertedActiveDeals)
})
}
} catch (e) {
Expand Down
10 changes: 6 additions & 4 deletions backend/lib/deal-observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ export async function storeActiveDeals (activeDeals, pgPool) {
term_max,
sector_id,
payload_retrievability_state,
last_payload_retrieval_attempt
last_payload_retrieval_attempt,
reverted
)
VALUES (
unnest($1::int[]),
Expand All @@ -94,8 +95,8 @@ export async function storeActiveDeals (activeDeals, pgPool) {
unnest($8::int[]),
unnest($9::bigint[]),
unnest($10::payload_retrievability_state[]),
unnest($11::timestamp[])

unnest($11::timestamp[]),
unnest($12::boolean[])
)
`
await pgPool.query(insertQuery, [
Expand All @@ -109,7 +110,8 @@ export async function storeActiveDeals (activeDeals, pgPool) {
activeDeals.map(deal => deal.term_max),
activeDeals.map(deal => deal.sector_id),
activeDeals.map(deal => deal.payload_retrievability_state),
activeDeals.map(deal => deal.last_payload_retrieval_attempt)
activeDeals.map(deal => deal.last_payload_retrieval_attempt),
activeDeals.map(deal => deal.reverted)
])
} catch (error) {
// If any error occurs, roll back the transaction
Expand Down
10 changes: 10 additions & 0 deletions backend/lib/look-up-payload-cids.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ export async function countStoredActiveDealsWithUnresolvedPayloadCid (pgPool) {
return result.rows[0].count
}

/**
* @param {Queryable} pgPool
* @returns {Promise<Array<Static<typeof ActiveDealDbEntry>>>}
*/
export async function countRevertedActiveDeals (pgPool) {
const query = 'SELECT COUNT(*) FROM active_deals WHERE reverted = TRUE'
const result = await pgPool.query(query)
return result.rows[0].count
}

/**
* @param {Queryable} pgPool
* @param {Static<typeof ActiveDealDbEntry>} deal
Expand Down
3 changes: 2 additions & 1 deletion backend/lib/rpc-service/data-types.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ const RawActorEvent = Type.Object({
const BlockEvent = Type.Object({
height: Type.Number(),
emitter: Type.String(),
event: ClaimEvent
event: ClaimEvent,
reverted: Type.Boolean()
})

const RpcRespone = Type.Object({
Expand Down
3 changes: 2 additions & 1 deletion backend/lib/rpc-service/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ export async function getActorEvents (actorEventFilter, makeRpcRequest) {
{
height: typedEventEntries.height,
emitter: typedEventEntries.emitter,
event: typedEvent
event: typedEvent,
reverted: typedEventEntries.reverted
}))
continue
}
Expand Down
23 changes: 18 additions & 5 deletions backend/lib/spark-api-submit-deals.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
/** @import {PgPool, Queryable} from '@filecoin-station/deal-observer-db' */
/** @import { Static } from '@sinclair/typebox' */
/** @import { ActiveDealDbEntry, PayloadRetrievabilityStateType } from '@filecoin-station/deal-observer-db/lib/types.js' */
import Cursor from 'pg-cursor'
import * as Sentry from '@sentry/node'
import { Type } from '@sinclair/typebox'
import { Value } from '@sinclair/typebox/value'

/**
* Finds deals that haven't been submitted to the Spark API yet and submits them.
*
* @param {PgPool} pgPool
* @param {number} batchSize
* @param {(eligibleDeals: Array) => Promise<{ingested: number; skipped: number}>} submitDeals
* @param {(eligibleDeals: Array<Static<typeof SubmittableDeal>>) => Promise<{ingested: number; skipped: number}>} submitDeals
* @returns {Promise<{submitted: number; ingested: number; skipped: number;}>} Number of deals submitted, ingested and skipped
*/
export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDeals) => {
Expand Down Expand Up @@ -45,7 +49,7 @@ export const findAndSubmitUnsubmittedDeals = async (pgPool, batchSize, submitDea
*
* @param {PgPool} pgPool
* @param {number} batchSize
* @returns {AsyncGenerator<Array>}
* @returns {AsyncGenerator<Array<Static<typeof SubmittableDeal>>, void, unknown>}
*/
const findUnsubmittedDeals = async function * (pgPool, batchSize) {
const client = await pgPool.connect()
Expand All @@ -72,7 +76,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) {
while (true) {
const rows = await cursor.read(batchSize)
if (rows.length === 0) break
yield rows
yield rows.map(row => Value.Parse(SubmittableDeal, row))
}

client.release()
Expand All @@ -82,7 +86,7 @@ const findUnsubmittedDeals = async function * (pgPool, batchSize) {
* Mark deals as submitted.
*
* @param {Queryable} pgPool
* @param {Array} eligibleDeals
* @param {Array<Static<typeof SubmittableDeal>>} eligibleDeals
*/
const markDealsAsSubmitted = async (pgPool, eligibleDeals) => {
await pgPool.query(`
Expand Down Expand Up @@ -112,7 +116,7 @@ const markDealsAsSubmitted = async (pgPool, eligibleDeals) => {
*
* @param {string} sparkApiBaseURL
* @param {string} sparkApiToken
* @param {Array} deals
* @param {Array<Static<typeof SubmittableDeal>>} deals
* @returns {Promise<{ingested: number; skipped: number}>}
*/
export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deals) => {
Expand Down Expand Up @@ -147,3 +151,12 @@ export const submitDealsToSparkApi = async (sparkApiBaseURL, sparkApiToken, deal

return /** @type {{ingested: number; skipped: number}} */ (await response.json())
}

const SubmittableDeal = Type.Object({
miner_id: Type.Number(),
client_id: Type.Number(),
piece_cid: Type.String(),
piece_size: Type.BigInt(),
payload_cid: Type.String(),
expires_at: Type.Date()
})
3 changes: 2 additions & 1 deletion backend/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export function convertBlockEventToActiveDealDbEntry (blockEvent) {
sector_id: blockEvent.event.sector,
payload_cid: undefined,
payload_retrievability_state: PayloadRetrievabilityState.NotQueried,
last_payload_retrieval_attempt: undefined
last_payload_retrieval_attempt: undefined,
reverted: blockEvent.reverted
})
}
44 changes: 40 additions & 4 deletions backend/test/deal-observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { PayloadRetrievabilityState } from '@filecoin-station/deal-observer-db/l
import { chainHeadTestData } from './test_data/chainHead.js'
import { rawActorEventTestData } from './test_data/rawActorEvent.js'
import { parse } from '@ipld/dag-json'
import { countRevertedActiveDeals } from '../lib/look-up-payload-cids.js'

describe('deal-observer-backend', () => {
let pgPool
Expand Down Expand Up @@ -38,7 +39,7 @@ describe('deal-observer-backend', () => {
sector: 6n,
payload_cid: undefined
}
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' })
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false })
const dbEntry = convertBlockEventToActiveDealDbEntry(event)
await storeActiveDeals([dbEntry], pgPool)
const actualData = await loadDeals(pgPool, 'SELECT * FROM active_deals')
Expand All @@ -54,7 +55,8 @@ describe('deal-observer-backend', () => {
sector_id: eventData.sector,
payload_cid: undefined,
payload_retrievability_state: PayloadRetrievabilityState.NotQueried,
last_payload_retrieval_attempt: undefined
last_payload_retrieval_attempt: undefined,
reverted: false
}
assert.deepStrictEqual(actualData, [expectedData])
})
Expand All @@ -73,7 +75,7 @@ describe('deal-observer-backend', () => {
payload_retrievability_state: PayloadRetrievabilityState.NotQueried,
last_payload_retrieval_attempt: undefined
}
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' })
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false })
const dbEntry = convertBlockEventToActiveDealDbEntry(event)
await storeActiveDeals([dbEntry], pgPool)
const expected = await loadDeals(pgPool, 'SELECT * FROM active_deals')
Expand All @@ -83,7 +85,7 @@ describe('deal-observer-backend', () => {

it('check number of stored deals', async () => {
const storeBlockEvent = async (eventData) => {
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06' })
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted: false })
const dbEntry = convertBlockEventToActiveDealDbEntry(event)
await storeActiveDeals([dbEntry], pgPool)
}
Expand All @@ -107,6 +109,40 @@ describe('deal-observer-backend', () => {
await storeBlockEvent(data)
assert.strictEqual(await countStoredActiveDeals(pgPool), 2n)
})

it('check number of reverted stored deals', async () => {
const storeBlockEvent = async (eventData, reverted) => {
const event = Value.Parse(BlockEvent, { height: 1, event: eventData, emitter: 'f06', reverted })
const dbEntry = convertBlockEventToActiveDealDbEntry(event)
await storeActiveDeals([dbEntry], pgPool)
}
const data = {
id: 1,
provider: 2,
client: 3,
pieceCid: 'baga6ea4seaqc4z4432snwkztsadyx2rhoa6rx3wpfzu26365wvcwlb2wyhb5yfi',
pieceSize: 4n,
termStart: 5,
termMin: 12340,
termMax: 12340,
sector: 6n
}
assert.strictEqual(await countRevertedActiveDeals(pgPool), 0n)
await storeBlockEvent(data, false)
assert.strictEqual(await countRevertedActiveDeals(pgPool), 0n)
data.id = 2
data.provider = 3
await storeBlockEvent(data, true)
assert.strictEqual(await countRevertedActiveDeals(pgPool), 1n)
data.id = 3
data.provider = 4
await storeBlockEvent(data, false)
assert.strictEqual(await countRevertedActiveDeals(pgPool), 1n)
data.id = 4
data.provider = 5
await storeBlockEvent(data, true)
assert.strictEqual(await countRevertedActiveDeals(pgPool), 2n)
})
})

describe('deal-observer-backend built in actor event observer', () => {
Expand Down
9 changes: 6 additions & 3 deletions backend/test/look-up-payload-cids.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
sector_id: 1,
payload_cid: undefined,
payload_retrievability_state: PayloadRetrievabilityState.NotQueried,
last_payload_retrieval_attempt: undefined
last_payload_retrieval_attempt: undefined,
reverted: false
})

await storeActiveDeals([deal], pgPool)
Expand Down Expand Up @@ -161,7 +162,8 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
sector_id: 1,
payload_cid: undefined,
payload_retrievability_state: PayloadRetrievabilityState.NotQueried,
last_payload_retrieval_attempt: new Date(now - 1000 * 60 * 60 * 24 * 4)
last_payload_retrieval_attempt: new Date(now - 1000 * 60 * 60 * 24 * 4),
reverted: undefined
})

await storeActiveDeals([deal], pgPool)
Expand Down Expand Up @@ -196,7 +198,8 @@ describe('deal-observer-backend piece indexer payload retrieval', () => {
sector_id: 1,
payload_cid: undefined,
payload_retrievability_state: PayloadRetrievabilityState.NotQueried,
last_payload_retrieval_attempt: new Date(now - 1000 * 60 * 60 * 24 * 4)
last_payload_retrieval_attempt: new Date(now - 1000 * 60 * 60 * 24 * 4),
reverted: false
})

await storeActiveDeals([deal], pgPool)
Expand Down
3 changes: 2 additions & 1 deletion db/lib/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ const ActiveDealDbEntry = Type.Object({
sector_id: Type.BigInt(),
payload_cid: Type.Optional(Type.String()),
payload_retrievability_state: PayloadRetrievabilityStateType,
last_payload_retrieval_attempt: Type.Optional(Type.Date())
last_payload_retrieval_attempt: Type.Optional(Type.Date()),
reverted: Type.Optional(Type.Boolean())
})

export { ActiveDealDbEntry, PayloadRetrievabilityState, PayloadRetrievabilityStateType }
2 changes: 2 additions & 0 deletions db/migrations/011.do.add-reverted-column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE active_deals
ADD COLUMN reverted BOOLEAN;