Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use rollup indexes #46

Merged
merged 13 commits into from
Jun 9, 2023
28 changes: 14 additions & 14 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"type": "module",
"scripts": {
"start": "npm run dev",
"dev": "npm run build:debug && miniflare dist/worker.mjs --watch --debug -m --r2-persist",
"dev": "npm run build:debug && miniflare dist/worker.mjs --watch --debug -m --r2-persist --global-async-io --global-timers",
"build": "esbuild --bundle src/index.js --format=esm --sourcemap --minify --outfile=dist/worker.mjs",
"build:debug": "esbuild --bundle src/index.js --format=esm --outfile=dist/worker.mjs",
"test": "npm run build:debug && node --test --experimental-vm-modules",
Expand All @@ -27,8 +27,8 @@
"@ipld/dag-cbor": "^9.0.0",
"@ipld/dag-json": "^10.0.1",
"@ipld/dag-pb": "^4.0.2",
"@web3-storage/gateway-lib": "^3.2.0",
"cardex": "^2.1.0",
"@web3-storage/gateway-lib": "^3.2.2",
"cardex": "^2.2.0",
"chardet": "^1.5.0",
"dagula": "^7.0.0",
"magic-bytes.js": "^1.0.12",
Expand All @@ -46,5 +46,10 @@
"miniflare": "^2.9.0",
"standard": "^17.0.0",
"uint8arrays": "^4.0.3"
},
"standard": {
"ignore": [
"*.ts"
]
}
}
5 changes: 4 additions & 1 deletion scripts/r2-put.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Puts data to the local persisted Miniflare R2 buckets.
*
* Usage:
* node scripts/r2-put.js test.jpg --no-wrap
* node scripts/r2-put.js test.jpg --no-wrap --no-rollup
*/
import { R2Bucket } from '@miniflare/r2'
import { FileStorage } from '@miniflare/storage-file'
Expand All @@ -20,5 +20,8 @@ const wrapWithDirectory = process.argv.every(p => p !== '--no-wrap')

const { dataCid, carCids } = await builder.add(input, { wrapWithDirectory })

const rollup = process.argv.every(p => p !== '--no-rollup')
if (rollup) await builder.rollup(dataCid, carCids)

console.log(`Data CID:\n${dataCid}`)
console.log(`CAR CIDs:\n${carCids.join('\n')}`)
20 changes: 17 additions & 3 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { CID } from 'multiformats/cid'
import type { Link } from 'multiformats/link'
import type { Context } from '@web3-storage/gateway-lib'
import type { CARLink } from 'cardex/api'
import type { MemoryBudget } from './lib/mem-budget'

export {}
Expand All @@ -12,8 +13,21 @@ export interface Environment {
MAX_SHARDS: string
}

export interface CarCidsContext extends Context {
carCids: CID[]
export interface IndexSource {
/** Bucket this index can be found in */
bucket: R2Bucket
/** Bucket key for the source */
key: string
/**
* Origin CAR CID the index source applies to. Will be undefined if the index
* source is a multi-index index, which specifies origin CAR CIDs within the
* index.
*/
origin?: CARLink
}

export interface IndexSourcesContext extends Context {
indexSources: IndexSource[]
}

export interface R2GetOptions {
Expand Down
7 changes: 3 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ import {
} from '@web3-storage/gateway-lib/handlers'
import {
withDagula,
withCarCids,
withIndexSources,
withUnsupportedFeaturesHandler,
withVersionHeader
} from './middleware.js'

/**
* @typedef {import('./bindings').Environment} Environment
* @typedef {import('@web3-storage/gateway-lib').IpfsUrlContext} IpfsUrlContext
* @typedef {import('./bindings').CarCidsContext} CarCidsContext
* @typedef {import('@web3-storage/gateway-lib').DagulaContext} DagulaContext
*/

Expand All @@ -41,15 +40,15 @@ export default {
withUnsupportedFeaturesHandler,
withHttpGet,
withParsedIpfsUrl,
withCarCids,
withIndexSources,
withDagula,
withFixedLengthStream
)
return middleware(handler)(request, env, ctx)
}
}

/** @type {import('@web3-storage/gateway-lib').Handler<DagulaContext & CarCidsContext & IpfsUrlContext, Environment>} */
/** @type {import('@web3-storage/gateway-lib').Handler<DagulaContext & IpfsUrlContext, Environment>} */
async function handler (request, env, ctx) {
const { headers } = request
const { searchParams } = ctx
Expand Down
11 changes: 8 additions & 3 deletions src/lib/block-batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ const MAX_BYTES_BETWEEN = 1024 * 1024 * 2
const MAX_BATCH_SIZE = 10

/**
* @typedef {import('multiformats').CID} CID
* @typedef {{ carCid: CID, blockCid: CID, offset: number }} BatchItem
* @typedef {{ add: (i: BatchItem) => void, next: () => BatchItem[] }} BlockBatcher
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {{ carCid: import('cardex/api').CARLink, blockCid: UnknownLink, offset: number }} BatchItem
* @typedef {{ add: (i: BatchItem) => void, remove: (cid: UnknownLink) => void, next: () => BatchItem[] }} BlockBatcher
*/

/**
Expand All @@ -21,6 +21,11 @@ export class OrderedCarBlockBatcher {
this.#queue.push(item)
}

/** @param {UnknownLink} cid */
remove (cid) {
this.#queue = this.#queue.filter(item => item.blockCid.toString() !== cid.toString())
}

next () {
const queue = this.#queue
let prevItem = queue.shift()
Expand Down
38 changes: 15 additions & 23 deletions src/lib/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { MultiCarIndex, StreamingCarIndex } from './car-index.js'
import { OrderedCarBlockBatcher } from './block-batch.js'

/**
* @typedef {import('multiformats').CID} CID
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('cardex/api').IndexItem} IndexEntry
* @typedef {string} MultihashString
* @typedef {import('dagula').Block} Block
Expand All @@ -23,31 +23,22 @@ const MAX_ENCODED_BLOCK_LENGTH = (1024 * 1024 * 2) + 39 + 61
export class R2Blockstore {
/**
* @param {R2Bucket} dataBucket
* @param {R2Bucket} indexBucket
* @param {CID[]} carCids
* @param {import('../bindings').IndexSource[]} indexSources
*/
constructor (dataBucket, indexBucket, carCids) {
constructor (dataBucket, indexSources) {
this._dataBucket = dataBucket
this._idx = new MultiCarIndex()
for (const carCid of carCids) {
this._idx.addIndex(carCid, new StreamingCarIndex(async () => {
const idxPath = `${carCid}/${carCid}.car.idx`
const idxObj = await indexBucket.get(idxPath)
if (!idxObj) {
throw Object.assign(new Error(`index not found: ${carCid}`), { code: 'ERR_MISSING_INDEX' })
}
return idxObj.body
}))
for (const src of indexSources) {
this._idx.addIndex(new StreamingCarIndex(src))
}
}

/** @param {CID} cid */
/** @param {UnknownLink} cid */
async get (cid) {
// console.log(`get ${cid}`)
const multiIdxEntry = await this._idx.get(cid)
if (!multiIdxEntry) return
const [carCid, entry] = multiIdxEntry
const carPath = `${carCid}/${carCid}.car`
const entry = await this._idx.get(cid)
if (!entry) return
const carPath = `${entry.origin}/${entry.origin}.car`
const range = { offset: entry.offset }
const res = await this._dataBucket.get(carPath, { range })
if (!res) return
Expand Down Expand Up @@ -188,6 +179,8 @@ export class BatchingR2Blockstore extends R2Blockstore {
}
blocks.forEach(b => b.resolve(block))
pendingBlocks.delete(key)
// remove from batcher if queued to be read
batcher.remove(blockHeader.cid)
}
} catch {
break
Expand All @@ -205,14 +198,13 @@ export class BatchingR2Blockstore extends R2Blockstore {
}
}

/** @param {CID} cid */
/** @param {UnknownLink} cid */
async get (cid) {
// console.log(`get ${cid}`)
const multiIdxEntry = await this._idx.get(cid)
if (!multiIdxEntry) return
const entry = await this._idx.get(cid)
if (!entry) return

const [carCid, entry] = multiIdxEntry
this.#batcher.add({ carCid, blockCid: cid, offset: entry.offset })
this.#batcher.add({ carCid: entry.origin, blockCid: cid, offset: entry.offset })

if (!entry.multihash) throw new Error('missing entry multihash')
const key = mhToKey(entry.multihash.bytes)
Expand Down
48 changes: 27 additions & 21 deletions src/lib/car-index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,38 @@ import { UniversalReader } from 'cardex/universal'
import defer from 'p-defer'

/**
* @typedef {import('multiformats').CID} CID
* @typedef {import('cardex/multihash-index-sorted/api').MultihashIndexItem} IndexEntry
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('cardex/multi-index/api').MultiIndexItem & import('cardex/multihash-index-sorted/api').MultihashIndexItem} IndexEntry
* @typedef {string} MultihashString
* @typedef {{ get: (c: CID) => Promise<IndexEntry|undefined> }} CarIndex
* @typedef {{ get: (c: UnknownLink) => Promise<IndexEntry|undefined> }} CarIndex
*/

export class MultiCarIndex {
/** @type {Map<CID, CarIndex>} */
/** @type {CarIndex[]} */
#idxs

constructor () {
this.#idxs = new Map()
this.#idxs = []
}

/**
* @param {CID} carCid
* @param {CarIndex} index
*/
addIndex (carCid, index) {
this.#idxs.set(carCid, index)
addIndex (index) {
this.#idxs.push(index)
}

/**
* @param {CID} cid
* @returns {Promise<[CID, IndexEntry] | undefined>}
* @param {UnknownLink} cid
* @returns {Promise<IndexEntry | undefined>}
*/
async get (cid) {
const deferred = defer()
const idxEntries = Array.from(this.#idxs.entries())

Promise
.allSettled(idxEntries.map(async ([carCid, idx]) => {
.allSettled(this.#idxs.map(async idx => {
const entry = await idx.get(cid)
if (entry) deferred.resolve([carCid, entry])
if (entry) deferred.resolve(entry)
}))
.then(results => {
// if not already resolved, check for rejections and reject
Expand All @@ -55,6 +53,9 @@ export class MultiCarIndex {
* @implements {CarIndex}
*/
export class StreamingCarIndex {
/** @type {import('../bindings').IndexSource} */
#source

/** @type {Map<MultihashString, IndexEntry>} */
#idx = new Map()

Expand All @@ -67,22 +68,27 @@ export class StreamingCarIndex {
/** @type {Error?} */
#buildError = null

/** @param {() => Promise<ReadableStream<Uint8Array>>} fetchIndex */
constructor (fetchIndex) {
this.#buildIndex(fetchIndex)
/** @param {import('../bindings').IndexSource} source */
constructor (source) {
this.#source = source
this.#buildIndex()
}

/** @param {() => Promise<ReadableStream<Uint8Array>>} fetchIndex */
async #buildIndex (fetchIndex) {
async #buildIndex () {
this.#building = true
try {
const stream = await fetchIndex()
const idxReader = UniversalReader.createReader({ reader: stream.getReader() })
const idxObj = await this.#source.bucket.get(this.#source.key)
if (!idxObj) {
throw Object.assign(new Error(`index not found: ${this.#source.key}`), { code: 'ERR_MISSING_INDEX' })
}
const idxReader = UniversalReader.createReader({ reader: idxObj.body.getReader() })
while (true) {
const { done, value } = await idxReader.read()
if (done) break

const entry = /** @type {IndexEntry} */(value)
entry.origin = entry.origin ?? this.#source.origin

const key = mhToKey(entry.multihash.bytes)

// set this value in the index so any future requests for this key get
Expand Down Expand Up @@ -122,7 +128,7 @@ export class StreamingCarIndex {
}
}

/** @param {CID} cid */
/** @param {UnknownLink} cid */
async get (cid) {
if (this.#buildError) {
throw new Error('failed to build index', { cause: this.#buildError })
Expand Down
Loading