Skip to content

Commit

Permalink
feat: add dependencies for index/add invocations (#363)
Browse files Browse the repository at this point in the history
Upgrades to `@web3-storage/upload-api` v13 and adds dependencies for
`index/add` invocations.

Adds multihashes to the Elastic IPFS SQS multihashes queue and adds
block index data to Dynamo `blocks-cars-position` table.

Note: I did not also generate dudewhere/satnav indexes. Hoverboard reads
directly from dynamo and Freeway uses location claims (generated from
dynamo). I don't think we _need_ them anymore.
  • Loading branch information
Alan Shaw authored May 7, 2024
1 parent 05ce2c0 commit c5712d9
Show file tree
Hide file tree
Showing 27 changed files with 453 additions and 19 deletions.
48 changes: 38 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"dependencies": {
"@ipld/dag-json": "^10.1.5",
"@web-std/stream": "^1.0.3",
"@web3-storage/upload-api": "^13.0.2",
"aws-cdk-lib": "2.124.0",
"sst": "^2.40.3"
},
Expand Down
1 change: 1 addition & 0 deletions stacks/upload-api-stack.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ export function UploadApiStack({ stack, app }) {
customDomain,
defaults: {
function: {
timeout: '30 seconds',
permissions: [
allocationTable,
storeTable,
Expand Down
169 changes: 169 additions & 0 deletions upload-api/external-services/ipni-service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import { SQSClient, SendMessageBatchCommand } from '@aws-sdk/client-sqs'
import { DynamoDBClient, BatchWriteItemCommand } from '@aws-sdk/client-dynamodb'
import { marshall } from '@aws-sdk/util-dynamodb'
import { base58btc } from 'multiformats/bases/base58'
import * as Link from 'multiformats/link'
import * as raw from 'multiformats/codecs/raw'
import retry from 'p-retry'
import { ok, error } from '@ucanto/server'

/**
* @typedef {{
* digest: import('multiformats').MultihashDigest,
* location: URL,
* range: [number, number]
* }} BlocksCarsPositionRecord
*/

/**
* @param {{ url: URL, region: string }} multihashesQueueConfig
* @param {{ name: string, region: string }} blocksCarsPositionConfig
*/
export const createIPNIService = (multihashesQueueConfig, blocksCarsPositionConfig) => {
const sqs = new SQSClient(multihashesQueueConfig)
const multihashesQueue = new BlockAdvertisementPublisher({ client: sqs, url: multihashesQueueConfig.url })
const dynamo = new DynamoDBClient(blocksCarsPositionConfig)
const blocksCarsPositionStore = new BlockIndexStore({ client: dynamo, name: blocksCarsPositionConfig.name })
return useIPNIService(multihashesQueue, blocksCarsPositionStore)
}

/**
* @param {BlockAdvertisementPublisher} blockAdvertPublisher
* @param {BlockIndexStore} blockIndexStore
* @returns {import('@web3-storage/upload-api').IPNIService}
*/
export const useIPNIService = (blockAdvertPublisher, blockIndexStore) => ({
/** @param {import('@web3-storage/upload-api').ShardedDAGIndex} index */
async publish (index) {
/** @type {import('multiformats').MultihashDigest[]} */
const items = []
/** @type {BlocksCarsPositionRecord[]} */
const records = []
for (const shard of index.shards.values()) {
for (const [digest, range] of shard.entries()) {
items.push(digest)
records.push({
digest,
location: new URL(`https://w3s.link/ipfs/${Link.create(raw.code, digest)}`),
range
})
}
}

const addRes = await blockAdvertPublisher.addAll(items)
if (addRes.error) return addRes

const putRes = await blockIndexStore.putAll(records)
if (putRes.error) return putRes

return ok({})
}
})

/** The maximum size an SQS batch can be. */
const MAX_QUEUE_BATCH_SIZE = 10

export class BlockAdvertisementPublisher {
#client
#url

/**
* @param {object} config
* @param {SQSClient} config.client
* @param {URL} config.url
*/
constructor (config) {
this.#client = config.client
this.#url = config.url
}

/**
* @param {import('multiformats').MultihashDigest[]} digests
* @returns {Promise<import('@ucanto/interface').Result<import('@ucanto/interface').Unit, import('@ucanto/interface').Failure>>}
*/
async addAll (digests) {
try {
// stringify and dedupe
const items = [...new Set(digests.map(d => base58btc.encode(d.bytes))).values()]
while (true) {
const batch = items.splice(0, MAX_QUEUE_BATCH_SIZE)
if (!batch.length) break

let entries = batch.map(s => ({ Id: s, MessageBody: s }))
await retry(async () => {
const cmd = new SendMessageBatchCommand({
QueueUrl: this.#url.toString(),
Entries: entries
})
const res = await this.#client.send(cmd)
const failures = res.Failed
if (failures?.length) {
failures.forEach(f => console.warn(f))
entries = entries.filter(e => failures.some(f => f.Id === e.Id))
throw new Error('failures in response')
}
})
}
return ok({})
} catch (/** @type {any} */ err) {
return error(err)
}
}
}

/** The maximum size a Dynamo batch can be. */
const MAX_TABLE_BATCH_SIZE = 25

export class BlockIndexStore {
#client
#name

/**
* @param {object} config
* @param {DynamoDBClient} config.client
* @param {string} config.name
*/
constructor (config) {
this.#client = config.client
this.#name = config.name
}

/**
* @param {BlocksCarsPositionRecord[]} records
* @returns {Promise<import('@ucanto/interface').Result<import('@ucanto/interface').Unit, import('@ucanto/interface').Failure>>}
*/
async putAll (records) {
try {
const items = [...records]
while (true) {
const batch = items.splice(0, MAX_TABLE_BATCH_SIZE)
if (!batch.length) break

/** @type {Record<string, import('@aws-sdk/client-dynamodb').WriteRequest[]>} */
let requestItems = {
[this.#name]: batch.map(r => ({
PutRequest: {
Item: marshall({
blockmultihash: base58btc.encode(r.digest.bytes),
carpath: r.location.toString(),
offset: r.range[0],
length: r.range[1]
})
}
}))
}
await retry(async () => {
const cmd = new BatchWriteItemCommand({ RequestItems: requestItems })
const res = await this.#client.send(cmd)
if (res.UnprocessedItems && Object.keys(res.UnprocessedItems).length) {
requestItems = res.UnprocessedItems
throw new Error('unprocessed items')
}
})
}
return ok({})
} catch (/** @type {any} */ err) {
return error(err)
}
}
}
15 changes: 15 additions & 0 deletions upload-api/functions/ucan-invocation-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { createSpaceSnapshotStore } from '@web3-storage/w3infra-billing/tables/s
import { useUsageStore } from '../stores/usage.js'
import { createStripeBillingProvider } from '../billing.js'
import { createTasksScheduler } from '../scheduler.js'
import { createIPNIService } from '../external-services/ipni-service.js'

Sentry.AWSLambda.init({
environment: process.env.SST_STAGE,
Expand Down Expand Up @@ -139,6 +140,8 @@ export async function ucanInvocationRouter(request) {
carparkBucketEndpoint,
carparkBucketAccessKeyId,
carparkBucketSecretAccessKey,
blocksCarsPositionTableConfig,
multihashesQueueConfig,
} = getLambdaEnv()

if (request.body === undefined) {
Expand Down Expand Up @@ -204,11 +207,13 @@ export async function ucanInvocationRouter(request) {
url: uploadServiceURL
})
const tasksScheduler = createTasksScheduler(() => selfConnection)
const ipniService = createIPNIService(multihashesQueueConfig, blocksCarsPositionTableConfig)

const server = createUcantoServer(serviceSigner, {
codec,
allocationsStorage,
blobsStorage,
blobRetriever: blobsStorage,
tasksStorage,
receiptsStorage,
tasksScheduler,
Expand Down Expand Up @@ -265,6 +270,7 @@ export async function ucanInvocationRouter(request) {
plansStorage,
requirePaymentPlan,
usageStorage,
ipniService,
})

const processingCtx = {
Expand Down Expand Up @@ -382,6 +388,15 @@ function getLambdaEnv () {
carparkBucketEndpoint: mustGetEnv('R2_ENDPOINT'),
carparkBucketAccessKeyId: mustGetEnv('R2_ACCESS_KEY_ID'),
carparkBucketSecretAccessKey: mustGetEnv('R2_SECRET_ACCESS_KEY'),
// IPNI service
multihashesQueueConfig: {
url: new URL(mustGetEnv('MULTIHASHES_QUEUE_URL')),
region: mustGetEnv('AWS_REGION')
},
blocksCarsPositionTableConfig: {
name: mustGetEnv('BLOCKS_CAR_POSITION_TABLE_NAME'),
region: mustGetEnv('AWS_REGION')
},
// set for testing
dbEndpoint: process.env.DYNAMO_DB_ENDPOINT,
}
Expand Down
4 changes: 2 additions & 2 deletions upload-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
"@ucanto/validator": "^9.0.2",
"@web-std/fetch": "^4.1.0",
"@web3-storage/access": "^18.3.2",
"@web3-storage/capabilities": "^15.0.0",
"@web3-storage/capabilities": "^17.0.0",
"@web3-storage/did-mailto": "^2.1.0",
"@web3-storage/upload-api": "^11.0.0",
"@web3-storage/upload-api": "^13.0.1",
"multiformats": "^13.1.0",
"nanoid": "^5.0.2",
"preact": "^10.14.1",
Expand Down
Loading

0 comments on commit c5712d9

Please sign in to comment.