Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: serve CAR blocks #68

Merged
merged 3 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
"@ipld/dag-cbor": "^9.0.0",
"@ipld/dag-json": "^10.0.1",
"@ipld/dag-pb": "^4.0.2",
"@web3-storage/content-claims": "^2.1.1",
"@web3-storage/gateway-lib": "^3.2.4",
"@web3-storage/content-claims": "^3.0.1",
"@web3-storage/gateway-lib": "^3.3.2",
"cardex": "^2.3.1",
"chardet": "^1.5.0",
"dagula": "^7.0.0",
"http-range-parse": "^1.0.0",
"lnmap": "^1.0.1",
"magic-bytes.js": "^1.0.12",
"mrmime": "^1.0.1",
Expand Down
1 change: 1 addition & 0 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface SimpleBucket {
export interface SimpleBucketObject {
readonly key: string
readonly body: ReadableStream
arrayBuffer(): Promise<ArrayBuffer>
}

export interface IndexSource {
Expand Down
82 changes: 82 additions & 0 deletions src/handlers/car-block.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/* eslint-env browser */
/* global FixedLengthStream */
import { HttpError } from '@web3-storage/gateway-lib/util'
import { CAR_CODE } from '../constants.js'
import * as Http from '../lib/http.js'

/** @typedef {import('@web3-storage/gateway-lib').IpfsUrlContext} CarBlockHandlerContext */

/**
* Handler that serves CAR files directly from R2.
*
* @type {import('@web3-storage/gateway-lib').Handler<CarBlockHandlerContext, import('../bindings').Environment>}
*/
export async function handleCarBlock (request, env, ctx) {
const { searchParams, dataCid } = ctx
if (!dataCid) throw new Error('missing data CID')
if (!searchParams) throw new Error('missing URL search params')

if (request.method !== 'HEAD' && request.method !== 'GET') {
throw new HttpError('method not allowed', { status: 405 })
}
if (dataCid.code !== CAR_CODE) {
throw new HttpError('not a CAR CID', { status: 400 })
}

const etag = `"${dataCid}"`
if (request.headers.get('If-None-Match') === etag) {
return new Response(null, { status: 304 })
}

if (request.method === 'HEAD') {
const obj = await env.CARPARK.head(`${dataCid}/${dataCid}.car`)
if (!obj) throw new HttpError('CAR not found', { status: 404 })
return new Response(undefined, {
headers: {
'Accept-Ranges': 'bytes',
'Content-Length': obj.size.toString(),
Etag: etag
}
})
}

/** @type {import('../lib/http').Range|undefined} */
let range
if (request.headers.has('range')) {
try {
range = Http.parseRange(request.headers.get('range') ?? '')
} catch (err) {
throw new HttpError('invalid range', { status: 400, cause: err })
}
}

const obj = await env.CARPARK.get(`${dataCid}/${dataCid}.car`, { range })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be cool to try this with roundabout. I think it should work and let us redirect to presigned url.

Probably good to keep as is and track issue to attempt cost optimisation :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes but we probably don't want to give out those URLs to anyone via the gateway. Someone will try to use/abuse and they also expire...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the usage would be the same as here? it would be a redirect that response will be read right away right? So, expiration would not be a problem, and if we perform request to roundabout within the worker and redirect to presigned URL then user will not even know how URL was created.

Use/abuse would be the same as via freeway no. So, rate limits is really the solution we seem to have to avoid abuse

if (!obj) throw new HttpError('CAR not found', { status: 404 })

const status = range ? 206 : 200
const headers = new Headers({
'Content-Type': 'application/vnd.ipld.car; version=1;',
'X-Content-Type-Options': 'nosniff',
'Cache-Control': 'public, max-age=29030400, immutable',
'Content-Disposition': `attachment; filename="${dataCid}.car"`,
Etag: etag
})

let contentLength = obj.size
if (range) {
let first, last
if ('suffix' in range) {
first = obj.size - range.suffix
last = obj.size - 1
} else {
first = range.offset || 0
last = range.length != null ? first + range.length - 1 : obj.size - 1
}
headers.set('Content-Range', `bytes ${first}-${last}/${obj.size}`)
contentLength = last - first
}
headers.set('Content-Length', contentLength.toString())

// @ts-expect-error ReadableStream types incompatible
return new Response(obj.body.pipeThrough(new FixedLengthStream(contentLength)), { status, headers })
}
10 changes: 6 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import {
import {
withDagula,
withIndexSources,
withUnsupportedFeaturesHandler,
withVersionHeader
withHttpRangeUnsupported,
withVersionHeader,
withCarHandler
} from './middleware.js'

/**
Expand All @@ -39,9 +40,10 @@ export default {
withVersionHeader,
withContentDispositionHeader,
withErrorHandler,
withUnsupportedFeaturesHandler,
withHttpGet,
withParsedIpfsUrl,
withCarHandler,
withHttpRangeUnsupported,
withHttpGet,
withIndexSources,
withDagula,
withFixedLengthStream
Expand Down
14 changes: 11 additions & 3 deletions src/lib/bucket.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ export class CachingBucket {
async get (key) {
// > the cache key requires a TLD to be present in the URL
const cacheKey = new URL(key, 'http://cache.freeway.dag.haus')
const res = await this.#cache.match(cacheKey)
if (res && res.body) return { key, body: res.body }
const cacheRes = await this.#cache.match(cacheKey)
if (cacheRes && cacheRes.body) return { key, body: cacheRes.body, arrayBuffer: () => cacheRes.arrayBuffer() }
const obj = await this.#source.get(key)
if (!obj) return null
const [body0, body1] = obj.body.tee()
this.#ctx.waitUntil(this.#cache.put(cacheKey, new Response(body1, {
headers: { 'Cache-Control': `max-age=${MAX_AGE}` }
})))
return { key, body: body0 }
const res = new Response(body0)
return {
key,
get body () {
if (!res.body) throw new Error('missing body')
return res.body
},
arrayBuffer: () => res.arrayBuffer()
}
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/lib/car.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { CarReader } from '@ipld/car'
import { CAR_CODE } from '../constants.js'

export const code = CAR_CODE

/**
* @param {Uint8Array} bytes
* @returns {Promise<Array<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array }>>}
*/
export async function decode (bytes) {
const reader = await CarReader.fromBytes(bytes)
const blocks = []
for await (const b of reader.blocks()) {
blocks.push({
cid: /** @type {import('multiformats').UnknownLink} */ (b.cid),
bytes: b.bytes
})
}
return blocks
}
28 changes: 23 additions & 5 deletions src/lib/dag-index/content-claims.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as raw from 'multiformats/codecs/raw'
import * as Claims from '@web3-storage/content-claims/client'
import { MultihashIndexSortedReader } from 'cardex/multihash-index-sorted'
import { Map as LinkMap } from 'lnmap'
import { CAR_CODE } from '../../constants'
import * as CAR from '../car.js'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
Expand All @@ -14,6 +14,11 @@ import { CAR_CODE } from '../../constants'

/** @implements {Index} */
export class ContentClaimsIndex {
/**
* Index store.
* @type {import('../../bindings').SimpleBucket}
*/
#bucket
/**
* Cached index entries.
* @type {Map<UnknownLink, IndexEntry>}
Expand All @@ -39,9 +44,11 @@ export class ContentClaimsIndex {
#serviceURL

/**
* @param {import('../../bindings').SimpleBucket} bucket Bucket that stores CARs.
* @param {{ serviceURL?: URL }} [options]
*/
constructor (options) {
constructor (bucket, options) {
this.#bucket = bucket
this.#cache = new LinkMap()
this.#claimFetched = new LinkMap()
this.#serviceURL = options?.serviceURL
Expand Down Expand Up @@ -92,21 +99,32 @@ export class ContentClaimsIndex {
// and we don't serve anything that we don't have in our own bucket.
if (claim.type !== 'assert/relation') continue

// export the blocks from the claim - should include the CARv2 indexes
// export the blocks from the claim - may include the CARv2 indexes
const blocks = [...claim.export()]

// each part is a tuple of CAR CID (content) & CARv2 index CID (includes)
for (const { content, includes } of claim.parts) {
if (!isCARLink(content)) continue
if (!includes) continue

const block = blocks.find(b => b.cid.toString() === includes.toString())
/** @type {{ cid: import('multiformats').UnknownLink, bytes: Uint8Array }|undefined} */
let block = blocks.find(b => b.cid.toString() === includes.content.toString())

// if the index is not included in the claim, it should be in CARPARK
if (!block && includes.parts?.length) {
const obj = await this.#bucket.get(`${includes.parts[0]}/${includes.parts[0]}.car`)
if (!obj) continue
const blocks = await CAR.decode(new Uint8Array(await obj.arrayBuffer()))
block = blocks.find(b => b.cid.toString() === includes.content.toString())
}
if (!block) continue

const entries = await decodeIndex(content, block.bytes)
for (const entry of entries) {
this.#cache.set(Link.create(raw.code, entry.multihash), entry)
}
}
break
}
this.#claimFetched.set(cid, true)
}
Expand All @@ -116,7 +134,7 @@ export class ContentClaimsIndex {
* @param {import('multiformats').Link} cid
* @returns {cid is import('cardex/api').CARLink}
*/
const isCARLink = cid => cid.code === CAR_CODE
const isCARLink = cid => cid.code === CAR.code

/**
* Read a MultihashIndexSorted index for the passed origin CAR and return a
Expand Down
19 changes: 19 additions & 0 deletions src/lib/http.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// @ts-expect-error no types
import httpRangeParse from 'http-range-parse'

/** @typedef {{ offset: number, length?: number } | { offset?: number, length: number } | { suffix: number }} Range */

/**
* Convert a HTTP Range header to a range object.
* @param {string} value
* @returns {Range}
*/
export function parseRange (value) {
const result = httpRangeParse(value)
if (result.ranges) throw new Error('Multipart ranges not supported')
const { unit, first, last, suffix } = result
if (unit !== 'bytes') throw new Error(`Unsupported range unit: ${unit}`)
return suffix != null
? { suffix }
: { offset: first, length: last != null ? last - first + 1 : undefined }
}
24 changes: 21 additions & 3 deletions src/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { ContentClaimsIndex } from './lib/dag-index/content-claims.js'
import { MultiCarIndex, StreamingCarIndex } from './lib/dag-index/car.js'
import { CachingBucket, asSimpleBucket } from './lib/bucket.js'
import { MAX_CAR_BYTES_IN_MEMORY, CAR_CODE } from './constants.js'
import { handleCarBlock } from './handlers/car-block.js'

/**
* @typedef {import('./bindings').Environment} Environment
Expand All @@ -17,11 +18,11 @@ import { MAX_CAR_BYTES_IN_MEMORY, CAR_CODE } from './constants.js'
*/

/**
* Validates the request does not contain unsupported features.
* Validates the request does not contain a HTTP `Range` header.
* Returns 501 Not Implemented in case it has.
* @type {import('@web3-storage/gateway-lib').Middleware<import('@web3-storage/gateway-lib').Context>}
*/
export function withUnsupportedFeaturesHandler (handler) {
export function withHttpRangeUnsupported (handler) {
return (request, env, ctx) => {
// Range request https://github.com/web3-storage/gateway-lib/issues/12
if (request.headers.get('range')) {
Expand All @@ -32,6 +33,23 @@ export function withUnsupportedFeaturesHandler (handler) {
}
}

/**
* Middleware that will serve CAR files if a CAR codec is found in the path
* CID. If the CID is not a CAR CID it delegates to the next middleware.
*
* @type {import('@web3-storage/gateway-lib').Middleware<IpfsUrlContext, IpfsUrlContext, Environment>}
*/
export function withCarHandler (handler) {
return async (request, env, ctx) => {
const { dataCid } = ctx
if (!dataCid) throw new Error('missing data CID')
if (dataCid.code !== CAR_CODE) {
return handler(request, env, ctx) // pass to other handlers
}
return handleCarBlock(request, env, ctx)
}
}

/**
* Extracts a set of index sources from search params from the URL querystring
* or DUDEWHERE bucket.
Expand Down Expand Up @@ -143,7 +161,7 @@ export function withDagula (handler) {
blockstore = new BatchingR2Blockstore(env.CARPARK, index)
}
} else {
const index = new ContentClaimsIndex({
const index = new ContentClaimsIndex(asSimpleBucket(env.CARPARK), {
serviceURL: env.CONTENT_CLAIMS_SERVICE_URL ? new URL(env.CONTENT_CLAIMS_SERVICE_URL) : undefined
})
const found = await index.get(dataCid)
Expand Down
Loading