diff --git a/api/bin/spark.js b/api/bin/spark.js index c3388ed8..8792c9ee 100644 --- a/api/bin/spark.js +++ b/api/bin/spark.js @@ -1,4 +1,5 @@ import '../lib/instrument.js' +import assert from 'node:assert' import http from 'node:http' import { once } from 'node:events' import { createHandler } from '../index.js' @@ -13,9 +14,16 @@ const { HOST = '127.0.0.1', DOMAIN = 'localhost', DATABASE_URL, + DEAL_INGESTER_TOKEN, REQUEST_LOGGING = 'true' } = process.env +// This token is used by other Spark services to authenticate requests adding new deals +// to Spark's database of deals eligible for retrieval testing (`POST /eligible-deals-batch`). +// In production, the value is configured using Fly.io secrets (`fly secrets`). +// The same token is configured in Fly.io secrets for the deal-observer service too. +assert(DEAL_INGESTER_TOKEN, 'DEAL_INGESTER_TOKEN is required') + const client = new pg.Pool({ connectionString: DATABASE_URL, // allow the pool to close all connections and become empty @@ -68,6 +76,7 @@ const logger = { const handler = await createHandler({ client, logger, + dealIngestionAccessToken: DEAL_INGESTER_TOKEN, domain: DOMAIN }) const server = http.createServer(handler) diff --git a/api/index.js b/api/index.js index bfec292b..25a31531 100644 --- a/api/index.js +++ b/api/index.js @@ -9,7 +9,17 @@ import { recordNetworkInfoTelemetry } from '../common/telemetry.js' import { satisfies } from 'compare-versions' import { ethAddressFromDelegated } from '@glif/filecoin-address' -const handler = async (req, res, client, domain) => { +/** @import {IncomingMessage, ServerResponse} from 'node:http' */ +/** @import pg from 'pg' */ + +/* + * @param {IncomingMessage} req + * @param {ServerResponse} res + * @param {pg.Client} client + * @param {string} domain + * @param {string} dealIngestionAccessToken + */ +const handler = async (req, res, client, domain, dealIngestionAccessToken) => { if (req.headers.host.split(':')[0] !== domain) { return redirect(res, `https://${domain}${req.url}`) } @@ -36,6 +46,8 @@ const handler = async (req, res, client, domain) => { await getSummaryOfEligibleDealsForAllocator(req, res, client, segs[1]) } else if (segs[0] === 'inspect-request' && req.method === 'GET') { await inspectRequest(req, res) + } else if (segs[0] === 'eligible-deals-batch' && req.method === 'POST') { + await ingestEligibleDeals(req, res, client, dealIngestionAccessToken) } else { notFound(res) } @@ -399,15 +411,73 @@ export const inspectRequest = async (req, res) => { }) } +/** + * @param {IncomingMessage} req + * @param {ServerResponse} res + * @param {pg.Client} client + * @param {string} dealIngestionAccessToken + */ +export const ingestEligibleDeals = async (req, res, client, dealIngestionAccessToken) => { + if (req.headers.authorization !== `Bearer ${dealIngestionAccessToken}`) { + res.statusCode = 403 + res.end('Unauthorized') + return + } + + const body = await getRawBody(req, { limit: '100mb' }) + const deals = JSON.parse(body) + assert(Array.isArray(deals), 400, 'Invalid JSON Body, must be an array') + for (const d of deals) { + validate(d, 'clientId', { type: 'string', required: true }) + validate(d, 'minerId', { type: 'string', required: true }) + validate(d, 'pieceCid', { type: 'string', required: true }) + validate(d, 'pieceSize', { type: 'string', required: true }) + validate(d, 'payloadCid', { type: 'string', required: true }) + validate(d, 'expiresAt', { type: 'date', required: true }) + } + + const { rowCount: ingested } = await client.query(` + INSERT INTO eligible_deals ( + client_id, + miner_id, + piece_cid, + piece_size, + payload_cid, + expires_at, + sourced_from_f05_state + ) VALUES ( + unnest($1::TEXT[]), + unnest($2::TEXT[]), + unnest($3::TEXT[]), + unnest($4::BIGINT[]), + unnest($5::TEXT[]), + unnest($6::DATE[]), + false + ) ON CONFLICT DO NOTHING`, [ + deals.map(d => d.clientId), + deals.map(d => d.minerId), + deals.map(d => d.pieceCid), + deals.map(d => d.pieceSize), + deals.map(d => d.payloadCid), + deals.map(d => d.expiresAt) + ]) + + json(res, { + ingested, + skipped: deals.length - ingested + }) +} + export const createHandler = async ({ client, logger, + dealIngestionAccessToken, domain }) => { return (req, res) => { const start = new Date() logger.request(`${req.method} ${req.url} ...`) - handler(req, res, client, domain) + handler(req, res, client, domain, dealIngestionAccessToken) .catch(err => errorHandler(res, err, logger)) .then(() => { logger.request(`${req.method} ${req.url} ${res.statusCode} (${new Date() - start}ms)`) diff --git a/api/test/test.js b/api/test/test.js index a5699bae..110ca0b2 100644 --- a/api/test/test.js +++ b/api/test/test.js @@ -17,6 +17,8 @@ const participantAddress = '0x000000000000000000000000000000000000dEaD' const sparkVersion = '1.13.0' // This must be in sync with the minimum supported client version const currentSparkRoundNumber = 42n +const VALID_DEAL_INGESTION_TOKEN = 'authorized-token' + const VALID_MEASUREMENT = { cid: 'bafytest', providerAddress: '/dns4/localhost/tcp/8080', @@ -70,6 +72,7 @@ describe('Routes', () => { error: console.error, request () {} }, + dealIngestionAccessToken: VALID_DEAL_INGESTION_TOKEN, domain: '127.0.0.1' }) server = http.createServer(handler) @@ -773,4 +776,112 @@ describe('Routes', () => { }) }) }) + + describe('POST /eligible-deals-batch', () => { + // A miner ID value not found in real data + const TEST_MINER_ID = 'f000' + // A client ID value not found in real data + const TEST_CLIENT_ID = 'f001' + + const AUTH_HEADERS = { authorization: `Bearer ${VALID_DEAL_INGESTION_TOKEN}` } + + beforeEach(async () => { + await client.query( + 'DELETE FROM eligible_deals WHERE miner_id = $1', + ['f000'] + ) + }) + + it('ingests new deals', async () => { + const deals = [{ + minerId: TEST_MINER_ID, + clientId: TEST_CLIENT_ID, + pieceCid: 'bagaone', + pieceSize: '34359738368', + payloadCid: 'bafyone', + expiresAt: '2100-01-01' + }] + + const res = await fetch(`${spark}/eligible-deals-batch`, { + method: 'POST', + headers: AUTH_HEADERS, + body: JSON.stringify(deals) + }) + await assertResponseStatus(res, 200) + const body = await res.json() + + assert.deepStrictEqual(body, { ingested: 1, skipped: 0 }) + + const { rows } = await client.query( + 'SELECT * FROM eligible_deals WHERE miner_id = $1', + ['f000'] + ) + assert.deepStrictEqual(rows, [{ + miner_id: TEST_MINER_ID, + client_id: TEST_CLIENT_ID, + piece_cid: 'bagaone', + piece_size: '34359738368', + payload_cid: 'bafyone', + expires_at: new Date('2100-01-01'), + sourced_from_f05_state: false + }]) + }) + + it('skips deals that were already ingested from f05 state', async () => { + const { rows: [f05Deal] } = await client.query( + 'SELECT * FROM eligible_deals WHERE sourced_from_f05_state = TRUE LIMIT 1' + ) + + const res = await fetch(`${spark}/eligible-deals-batch`, { + method: 'POST', + headers: AUTH_HEADERS, + body: JSON.stringify([{ + minerId: f05Deal.miner_id, + clientId: f05Deal.client_id, + pieceCid: f05Deal.piece_cid, + pieceSize: f05Deal.piece_size, + payloadCid: f05Deal.payload_cid, + expiresAt: f05Deal.expires_at.toISOString() + }]) + }) + await assertResponseStatus(res, 200) + const body = await res.json() + + assert.deepStrictEqual(body, { ingested: 0, skipped: 1 }) + + const { rows } = await client.query(` + SELECT * FROM eligible_deals WHERE + miner_id = $1 + AND client_id = $2 + AND piece_cid = $3 + AND piece_size = $4 + `, [ + f05Deal.miner_id, + f05Deal.client_id, + f05Deal.piece_cid, + f05Deal.piece_size + ]) + + assert.deepStrictEqual(rows, [f05Deal]) + }) + + it('rejects unauthorized requests', async () => { + const deals = [{ + minerId: TEST_MINER_ID, + clientId: TEST_CLIENT_ID, + pieceCid: 'bagaone', + pieceSize: '34359738368', + payloadCid: 'bafyone', + expiresAt: '2100-01-01' + }] + + const res = await fetch(`${spark}/eligible-deals-batch`, { + method: 'POST', + body: JSON.stringify(deals) + }) + await assertResponseStatus(res, 403) + const body = await res.text() + assert.strictEqual(body, 'Unauthorized') + }) + }) })