Skip to content

Commit

Permalink
feat: upgrade cardex (#45)
Browse files Browse the repository at this point in the history
Upgrade to the new version of cardex, which has a web stream-y API
similar to https://github.com/ipld/js-unixfs.
  • Loading branch information
Alan Shaw authored Jun 5, 2023
1 parent 5aefdd7 commit 85fd4cb
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 30 deletions.
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"@ipld/dag-json": "^10.0.1",
"@ipld/dag-pb": "^4.0.2",
"@web3-storage/gateway-lib": "^3.1.1",
"cardex": "^1.0.1",
"cardex": "^2.1.0",
"chardet": "^1.5.0",
"dagula": "^7.0.0",
"magic-bytes.js": "^1.0.12",
Expand Down
9 changes: 4 additions & 5 deletions src/lib/blockstore.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { readBlockHead, asyncIterableReader } from '@ipld/car/decoder'
import { base58btc } from 'multiformats/bases/base58'
import defer from 'p-defer'
import { toIterable } from '@web3-storage/gateway-lib/util'
import { MultiCarIndex, StreamingCarIndex } from './car-index.js'
import { OrderedCarBlockBatcher } from './block-batch.js'

/**
* @typedef {import('multiformats').CID} CID
* @typedef {import('cardex/mh-index-sorted').IndexEntry} IndexEntry
* @typedef {import('cardex/api').IndexItem} IndexEntry
* @typedef {string} MultihashString
* @typedef {import('dagula').Block} Block
* @typedef {import('../bindings').R2Bucket} R2Bucket
Expand All @@ -31,14 +30,14 @@ export class R2Blockstore {
this._dataBucket = dataBucket
this._idx = new MultiCarIndex()
for (const carCid of carCids) {
this._idx.addIndex(carCid, new StreamingCarIndex((async function * () {
this._idx.addIndex(carCid, new StreamingCarIndex(async () => {
const idxPath = `${carCid}/${carCid}.car.idx`
const idxObj = await indexBucket.get(idxPath)
if (!idxObj) {
throw Object.assign(new Error(`index not found: ${carCid}`), { code: 'ERR_MISSING_INDEX' })
}
yield * toIterable(idxObj.body)
})()))
return idxObj.body
}))
}
}

Expand Down
27 changes: 15 additions & 12 deletions src/lib/car-index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { base58btc } from 'multiformats/bases/base58'
import { MultihashIndexSortedReader } from 'cardex'
import { UniversalReader } from 'cardex/universal'
import defer from 'p-defer'

/**
* @typedef {import('multiformats').CID} CID
* @typedef {import('cardex/mh-index-sorted').IndexEntry} IndexEntry
* @typedef {import('cardex/multihash-index-sorted/api').MultihashIndexItem} IndexEntry
* @typedef {string} MultihashString
* @typedef {{ get: (c: CID) => Promise<IndexEntry|undefined> }} CarIndex
*/
Expand Down Expand Up @@ -67,19 +67,22 @@ export class StreamingCarIndex {
/** @type {Error?} */
#buildError = null

/** @param {AsyncIterable<Uint8Array>} stream */
constructor (stream) {
this.#buildIndex(stream)
/** @param {() => Promise<ReadableStream<Uint8Array>>} fetchIndex */
constructor (fetchIndex) {
this.#buildIndex(fetchIndex)
}

/** @param {AsyncIterable<Uint8Array>} stream */
async #buildIndex (stream) {
console.log('building index')
/** @param {() => Promise<ReadableStream<Uint8Array>>} fetchIndex */
async #buildIndex (fetchIndex) {
this.#building = true
try {
const idxReader = MultihashIndexSortedReader.fromIterable(stream)
for await (const entry of idxReader.entries()) {
if (!entry.multihash) throw new Error('missing entry multihash')
const stream = await fetchIndex()
const idxReader = UniversalReader.createReader({ reader: stream.getReader() })
while (true) {
const { done, value } = await idxReader.read()
if (done) break

const entry = /** @type {IndexEntry} */(value)
const key = mhToKey(entry.multihash.bytes)

// set this value in the index so any future requests for this key get
Expand All @@ -100,7 +103,7 @@ export class StreamingCarIndex {

// signal we are done building the index
this.#building = false
console.log('finished building index')

// resolve any keys in the promised index as "not found" - we're done
// building so they will not get resolved otherwise.
for (const [key, promises] of this.#promisedIdx.entries()) {
Expand Down
10 changes: 6 additions & 4 deletions test/helpers.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* global TransformStream */
import { pack } from 'ipfs-car/pack'
import { CID } from 'multiformats/cid'
import { sha256 } from 'multiformats/hashes/sha2'
Expand Down Expand Up @@ -40,14 +41,15 @@ export class Builder {
*/
async #writeIndex (cid, bytes) {
const indexer = await CarIndexer.fromBytes(bytes)
const { writer, out: index } = MultihashIndexSortedWriter.create()
const { readable, writable } = new TransformStream()
const writer = MultihashIndexSortedWriter.createWriter({ writer: writable.getWriter() })

for await (const entry of indexer) {
writer.put(entry)
writer.add(entry.cid, entry.offset)
}
writer.close()
const indexBytes = concat(await collect(index))
await this.#satnav.put(`${cid}/${cid}.car.idx`, indexBytes)
// @ts-expect-error node web stream is not web stream
await this.#satnav.put(`${cid}/${cid}.car.idx`, readable)
}

/**
Expand Down

0 comments on commit 85fd4cb

Please sign in to comment.