Skip to content

Commit

Permalink
fix: remove max Content-Length middleware (#16)
Browse files Browse the repository at this point in the history
Also removes the memory budget code (which has no bearing on actual memory usage!).
  • Loading branch information
Alan Shaw authored Oct 21, 2022
1 parent 5e205be commit bf11d0c
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 249 deletions.
30 changes: 15 additions & 15 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"@web3-storage/gateway-lib": "^2.0.0",
"cardex": "^1.0.0",
"chardet": "^1.5.0",
"dagula": "^4.1.0",
"dagula": "^4.1.1",
"magic-bytes.js": "^1.0.12",
"mrmime": "^1.0.1",
"multiformats": "^9.9.0",
Expand Down
4 changes: 0 additions & 4 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ export interface CarCidsContext extends Context {
carCids: CID[]
}

export interface MemoryBudgetContext extends Context {
memoryBudget: MemoryBudget
}

export interface R2GetOptions {
range?: {
offset: number
Expand Down
15 changes: 2 additions & 13 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import {
withDagula,
withCarCids,
withUnsupportedFeaturesHandler,
withMemoryBudget,
withResponseMemoryRelease,
withVersionHeader,
withMaxContentLength
withVersionHeader
} from './middleware.js'

/**
Expand All @@ -31,11 +28,6 @@ import {
* @typedef {import('@web3-storage/gateway-lib').DagulaContext} DagulaContext
*/

/**
* Temporary limit to the size of the response until memory leak is resolved.
*/
const MAX_CONTENT_LENGTH = 1024 * 1024 * 128

export default {
/** @type {import('@web3-storage/gateway-lib').Handler<import('@web3-storage/gateway-lib').Context, import('./bindings').Environment>} */
fetch (request, env, ctx) {
Expand All @@ -50,11 +42,8 @@ export default {
withHttpGet,
withParsedIpfsUrl,
withCarCids,
withMemoryBudget,
withDagula,
withFixedLengthStream,
withResponseMemoryRelease,
withMaxContentLength.bind(null, MAX_CONTENT_LENGTH)
withFixedLengthStream
)
return middleware(handler)(request, env, ctx)
}
Expand Down
47 changes: 27 additions & 20 deletions src/lib/block-batch.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
const MAX_BYTES_BETWEEN = 1024 * 1024 * 2
const MAX_BATCH_SIZE = 10

export class BlockBatch {
/** @type {number[]} */
#offsets = []
/**
* @typedef {import('multiformats').CID} CID
* @typedef {{ carCid: CID, blockCid: CID, offset: number }} BatchItem
* @typedef {{ add: (i: BatchItem) => void, next: () => BatchItem[] }} BlockBatcher
*/

/**
* Add an offset to the batch
* @param {number} offset
*/
add (offset) {
this.#offsets.push(offset)
/**
* Batcher for blocks in CARs. Batches are grouped by CAR CID and blocks are
* returned in batches in the order they were inserted.
* @implements {BlockBatcher}
*/
export class OrderedCarBlockBatcher {
/** @type {BatchItem[]} */
#queue = []

/** @param {BatchItem} item */
add (item) {
this.#queue.push(item)
}

next () {
const offsets = this.#offsets // .sort((a, b) => a - b)
if (!offsets.length) return
const batch = []
let last = offsets[0]
const queue = this.#queue
let prevItem = queue.shift()
if (!prevItem) return []
const batch = [prevItem]
while (true) {
const offset = offsets.shift()
if (!offset) break
if (offset - last >= MAX_BYTES_BETWEEN) {
offsets.unshift(offset) // not in this batch, return to pile
const item = queue.at(0)
if (!item) break
if (item.carCid.toString() !== prevItem.carCid.toString() || item.offset - prevItem.offset >= MAX_BYTES_BETWEEN) {
break
}
batch.push(offset)
batch.push(item)
queue.shift() // remove from the queue
if (batch.length >= MAX_BATCH_SIZE) break
last = offset
prevItem = item
}
this.#offsets = offsets
return batch
}
}
139 changes: 68 additions & 71 deletions src/lib/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { base58btc } from 'multiformats/bases/base58'
import defer from 'p-defer'
import { toIterable } from '@web3-storage/gateway-lib/util'
import { MultiCarIndex, StreamingCarIndex } from './car-index.js'
import { BlockBatch } from './block-batch.js'
import { OrderedCarBlockBatcher } from './block-batch.js'

/**
* @typedef {import('multiformats').CID} CID
Expand All @@ -13,7 +13,8 @@ import { BlockBatch } from './block-batch.js'
* @typedef {import('../bindings').R2Bucket} R2Bucket
*/

const MAX_BLOCK_LENGTH = 1024 * 1024 * 4
// 2MB (max safe libp2p block size) + typical block header length + some leeway
const MAX_ENCODED_BLOCK_LENGTH = (1024 * 1024 * 2) + 39 + 61

/**
* A blockstore that is backed by an R2 bucket which contains CARv2
Expand All @@ -25,11 +26,9 @@ export class R2Blockstore {
* @param {R2Bucket} dataBucket
* @param {R2Bucket} indexBucket
* @param {CID[]} carCids
* @param {import('./mem-budget.js').MemoryBudget} memoryBudget
*/
constructor (dataBucket, indexBucket, carCids, memoryBudget) {
constructor (dataBucket, indexBucket, carCids) {
this._dataBucket = dataBucket
this._memoryBudget = memoryBudget
this._idx = new MultiCarIndex()
for (const carCid of carCids) {
this._idx.addIndex(carCid, new StreamingCarIndex((async function * () {
Expand Down Expand Up @@ -72,10 +71,10 @@ export class R2Blockstore {

export class BatchingR2Blockstore extends R2Blockstore {
/** @type {Map<string, Array<import('p-defer').DeferredPromise<Block|undefined>>>} */
#batchBlocks = new Map()
#pendingBlocks = new Map()

/** @type {Map<CID, BlockBatch>} */
#batches = new Map()
/** @type {import('./block-batch.js').BlockBatcher} */
#batcher = new OrderedCarBlockBatcher()

#scheduled = false

Expand Down Expand Up @@ -109,69 +108,72 @@ export class BatchingR2Blockstore extends R2Blockstore {

async #processBatch () {
console.log('processing batch')
const batches = this.#batches
const batchBlocks = this.#batchBlocks
this.#batches = new Map()
this.#batchBlocks = new Map()
const batcher = this.#batcher
this.#batcher = new OrderedCarBlockBatcher()
const pendingBlocks = this.#pendingBlocks
this.#pendingBlocks = new Map()

while (true) {
const batch = batcher.next()
if (!batch.length) break

batch.sort((a, b) => a.offset - b.offset)

const { carCid } = batch[0]
const carPath = `${carCid}/${carCid}.car`
const range = {
offset: batch[0].offset,
length: batch[batch.length - 1].offset - batch[0].offset + MAX_ENCODED_BLOCK_LENGTH
}

for (const [carCid, batcher] of batches) {
console.log(`processing batch for ${carCid}`)
while (true) {
const batch = batcher.next()
if (!batch) break
const carPath = `${carCid}/${carCid}.car`
const range = { offset: batch[0], length: batch[batch.length - 1] - batch[0] + MAX_BLOCK_LENGTH }

console.log(`waiting allocation for ${range.length} bytes...`)
await this._memoryBudget.request(range.length)

console.log(`fetching ${batch.length} blocks from ${carCid} (${range.length} bytes @ ${range.offset})`)
const res = await this._dataBucket.get(carPath, { range })
if (!res) {
for (const blocks of batchBlocks.values()) {
blocks.forEach(b => b.resolve())
}
return
console.log(`fetching ${batch.length} blocks from ${carCid} (${range.length} bytes @ ${range.offset})`)
const res = await this._dataBucket.get(carPath, { range })
if (!res) {
// should not happen, but if it does, we need to resolve `undefined`
// for the blocks in this batch - they are not found.
for (const blocks of pendingBlocks.values()) {
blocks.forEach(b => b.resolve())
}
return
}

const reader = res.body.getReader()
const bytesReader = asyncIterableReader((async function * () {
while (true) {
const { done, value } = await reader.read()
if (done) return
yield value
}
})())

let bytesResolved = 0
const reader = res.body.getReader()
const bytesReader = asyncIterableReader((async function * () {
while (true) {
try {
const blockHeader = await readBlockHead(bytesReader)
const bytes = await bytesReader.exactly(blockHeader.blockLength)
bytesReader.seek(blockHeader.blockLength)

const key = mhToKey(blockHeader.cid.multihash.bytes)
const blocks = batchBlocks.get(key)
if (blocks) {
// console.log(`got wanted block for ${blockHeader.cid}`)
blocks.forEach(b => b.resolve({ cid: blockHeader.cid, bytes }))
batchBlocks.delete(key)
bytesResolved += bytes.length
}
} catch {
break
}
const { done, value } = await reader.read()
if (done) return
yield value
}
})())

console.log(`releasing ${range.length - bytesResolved} discarded bytes`)
// release the bytes we didn't send on (they are released later)
this._memoryBudget.release(range.length - bytesResolved)
reader.cancel()
while (true) {
try {
const blockHeader = await readBlockHead(bytesReader)
const bytes = await bytesReader.exactly(blockHeader.blockLength)
bytesReader.seek(blockHeader.blockLength)

const key = mhToKey(blockHeader.cid.multihash.bytes)
const blocks = pendingBlocks.get(key)
if (blocks) {
// console.log(`got wanted block for ${blockHeader.cid}`)
const block = { cid: blockHeader.cid, bytes }
blocks.forEach(b => b.resolve(block))
pendingBlocks.delete(key)
}
} catch {
break
}
}
console.log(`finished processing batch for ${carCid}`)
console.log(`${batchBlocks.size} blocks remain`)
// we should have read all the bytes from the reader by now but if the
// bytesReader throws for bad data _before_ the end then we need to
// cancel the reader - we don't need the rest.
reader.cancel()
}

// resolve `undefined` for any remaining blocks
for (const blocks of pendingBlocks.values()) {
blocks.forEach(b => b.resolve())
}
console.log('finished processing batch')
}

/** @param {CID} cid */
Expand All @@ -181,19 +183,14 @@ export class BatchingR2Blockstore extends R2Blockstore {
if (!multiIdxEntry) return

const [carCid, entry] = multiIdxEntry
let batch = this.#batches.get(carCid)
if (!batch) {
batch = new BlockBatch()
this.#batches.set(carCid, batch)
}
batch.add(entry.offset)
this.#batcher.add({ carCid, blockCid: cid, offset: entry.offset })

if (!entry.multihash) throw new Error('missing entry multihash')
const key = mhToKey(entry.multihash.bytes)
let blocks = this.#batchBlocks.get(key)
let blocks = this.#pendingBlocks.get(key)
if (!blocks) {
blocks = []
this.#batchBlocks.set(key, blocks)
this.#pendingBlocks.set(key, blocks)
}
const deferred = defer()
blocks.push(deferred)
Expand Down
Loading

0 comments on commit bf11d0c

Please sign in to comment.