Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for sync car reading #121

Merged
merged 8 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@
"browser": "./src/reader-browser.js",
"import": "./src/reader.js"
},
"./reader-sync": {
"types": "./dist/src/reader-sync-browser.d.ts",
"import": "./src/reader-sync-browser.js"
},
"./writer": {
"types": "./dist/src/writer.d.ts",
"browser": "./src/writer-browser.js",
Expand Down
17 changes: 12 additions & 5 deletions src/api.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import type { CID } from 'multiformats/cid'

export type Promisable<T> = T | PromiseLike<T>
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved

/**
* Literally any `Iterable` (async or regular).
*/
export type AnyIterable<T> = Iterable<T> | AsyncIterable<T>
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved

export type { CID }
/* Generic types for interfacing with block storage */

Expand All @@ -21,18 +28,18 @@ export interface BlockIndex extends BlockHeader {

export interface RootsReader {
version: number
getRoots: () => Promise<CID[]>
getRoots: () => Promisable<CID[]>
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved
}

export interface BlockIterator extends AsyncIterable<Block> {}

export interface CIDIterator extends AsyncIterable<CID> {}

export interface BlockReader {
has: (key: CID) => Promise<boolean>
get: (key: CID) => Promise<Block | undefined>
blocks: () => BlockIterator
cids: () => CIDIterator
has: (key: CID) => Promisable<boolean>
get: (key: CID) => Promisable<Block | undefined>
blocks: () => AnyIterable<Block>
cids: () => AnyIterable<CID>
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved
}

export interface BlockWriter {
Expand Down
16 changes: 13 additions & 3 deletions src/coding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,22 @@ export interface CarDecoder {
blocksIndex: () => AsyncGenerator<BlockIndex>
}

export interface BytesReader {
export interface Seekable {
seek: (length: number) => void
}

export interface BytesReader extends Seekable {
upTo: (length: number) => Promise<Uint8Array>

exactly: (length: number) => Promise<Uint8Array>
exactly: (length: number, seek?: boolean) => Promise<Uint8Array>

seek: (length: number) => void
pos: number
}

export interface BytesReaderSync extends Seekable{
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved
upTo: (length: number) => Uint8Array

exactly: (length: number, seek?: boolean) => Uint8Array

pos: number
}
86 changes: 86 additions & 0 deletions src/decoder-common.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import varint from 'varint'

export const CIDV0_BYTES = {
SHA2_256: 0x12,
LENGTH: 0x20,
DAG_PB: 0x70
}

export const V2_HEADER_LENGTH = /* characteristics */ 16 /* v1 offset */ + 8 /* v1 size */ + 8 /* index offset */ + 8

/**
* Decodes varint and seeks the buffer
*
* ```js
* // needs bytes to be read first
* const bytes = reader.upTo(8) // maybe async
* ```
*
* @param {Uint8Array} bytes
* @param {import('./coding').Seekable} seeker
* @returns {number}
*/
export function decodeVarint (bytes, seeker) {
if (!bytes.length) {
throw new Error('Unexpected end of data')
}
const i = varint.decode(bytes)
seeker.seek(/** @type {number} */(varint.decode.bytes))
return i
/* c8 ignore next 2 */
// Node.js 12 c8 bug
}

/**
* Decode v2 header
*
* ```js
* // needs bytes to be read first
* const bytes = reader.exactly(V2_HEADER_LENGTH, true) // maybe async
* ```
*
* @param {Uint8Array} bytes
* @returns {import('./coding').CarV2FixedHeader}
*/
export function decodeV2Header (bytes) {
const dv = new DataView(bytes.buffer, bytes.byteOffset, bytes.byteLength)
let offset = 0
const header = {
version: 2,
/** @type {[bigint, bigint]} */
characteristics: [
dv.getBigUint64(offset, true),
dv.getBigUint64(offset += 8, true)
],
dataOffset: Number(dv.getBigUint64(offset += 8, true)),
dataSize: Number(dv.getBigUint64(offset += 8, true)),
indexOffset: Number(dv.getBigUint64(offset += 8, true))
}
return header
/* c8 ignore next 2 */
// Node.js 12 c8 bug
}

/**
* Checks the length of the multihash to be read afterwards
*
* ```js
* // needs bytes to be read first
* const bytes = reader.upTo(8) // maybe async
* ```
*
* @param {Uint8Array} bytes
*/
export function getMultihashLength (bytes) {
// | code | length | .... |
// where both code and length are varints, so we have to decode
// them first before we can know total length

varint.decode(bytes) // code
const codeLength = /** @type {number} */(varint.decode.bytes)
const length = varint.decode(bytes.subarray(varint.decode.bytes))
const lengthLength = /** @type {number} */(varint.decode.bytes)
const mhLength = codeLength + lengthLength + length

return mhLength
}
228 changes: 228 additions & 0 deletions src/decoder-sync.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import { CID } from 'multiformats/cid'
import * as Digest from 'multiformats/hashes/digest'
import { decode as decodeDagCbor } from '@ipld/dag-cbor'
import { CarHeader as headerValidator } from './header-validator.js'
import { CIDV0_BYTES, decodeV2Header, decodeVarint, getMultihashLength, V2_HEADER_LENGTH } from './decoder-common.js'

/**
* @typedef {import('./api').Block} Block
* @typedef {import('./api').BlockHeader} BlockHeader
* @typedef {import('./api').BlockIndex} BlockIndex
* @typedef {import('./coding').BytesReaderSync} BytesReaderSync
* @typedef {import('./coding').CarHeader} CarHeader
* @typedef {import('./coding').CarV2Header} CarV2Header
* @typedef {import('./coding').CarV2FixedHeader} CarV2FixedHeader
*/

/**
* Reads header data from a `BytesReader`. The header may either be in the form
* of a `CarHeader` or `CarV2Header` depending on the CAR being read.
*
* @name async decoder.readHeader(reader)
* @param {BytesReaderSync} reader
* @param {number} [strictVersion]
* @returns {CarHeader | CarV2Header}
*/
export function readHeader (reader, strictVersion) {
const length = decodeVarint(reader.upTo(8), reader)
if (length === 0) {
throw new Error('Invalid CAR header (zero length)')
}
const header = reader.exactly(length, true)
const block = decodeDagCbor(header)
if (!headerValidator(block)) {
throw new Error('Invalid CAR header format')
}
if ((block.version !== 1 && block.version !== 2) || (strictVersion !== undefined && block.version !== strictVersion)) {
throw new Error(`Invalid CAR version: ${block.version}${strictVersion !== undefined ? ` (expected ${strictVersion})` : ''}`)
}
// we've made 'roots' optional in the schema so we can do the version check
// before rejecting the block as invalid if there is no version
const hasRoots = Array.isArray(block.roots)
if ((block.version === 1 && !hasRoots) || (block.version === 2 && hasRoots)) {
throw new Error('Invalid CAR header format')
}
if (block.version === 1) {
return block
}
// version 2
const v2Header = decodeV2Header(reader.exactly(V2_HEADER_LENGTH, true))
reader.seek(v2Header.dataOffset - reader.pos)
const v1Header = readHeader(reader, 1)
return Object.assign(v1Header, v2Header)
/* c8 ignore next 2 */
// Node.js 12 c8 bug
}

/**
* Reads CID sync
*
* @param {BytesReaderSync} reader
* @returns {CID}
*/
function readCid (reader) {
const first = reader.exactly(2, false)
if (first[0] === CIDV0_BYTES.SHA2_256 && first[1] === CIDV0_BYTES.LENGTH) {
// cidv0 32-byte sha2-256
const bytes = reader.exactly(34, true)
const multihash = Digest.decode(bytes)
return CID.create(0, CIDV0_BYTES.DAG_PB, multihash)
}
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved

const version = decodeVarint(reader.upTo(8), reader)
if (version !== 1) {
throw new Error(`Unexpected CID version (${version})`)
}
const codec = decodeVarint(reader.upTo(8), reader)
const bytes = reader.exactly(getMultihashLength(reader.upTo(8)), true)
const multihash = Digest.decode(bytes)
return CID.create(version, codec, multihash)
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved
/* c8 ignore next 2 */
// Node.js 12 c8 bug
}

/**
* Reads the leading data of an individual block from CAR data from a
* `BytesReader`. Returns a `BlockHeader` object which contains
* `{ cid, length, blockLength }` which can be used to either index the block
* or read the block binary data.
*
* @name async decoder.readBlockHead(reader)
* @param {BytesReaderSync} reader
* @returns {BlockHeader}
*/
export function readBlockHead (reader) {
// length includes a CID + Binary, where CID has a variable length
// we have to deal with
const start = reader.pos
let length = decodeVarint(reader.upTo(8), reader)
if (length === 0) {
throw new Error('Invalid CAR section (zero length)')
}
length += (reader.pos - start)
const cid = readCid(reader)
const blockLength = length - Number(reader.pos - start) // subtract CID length

return { cid, length, blockLength }
/* c8 ignore next 2 */
// Node.js 12 c8 bug
}

/**
* Creates a `CarDecoder` from a `BytesReader`. The `CarDecoder` is as async
* interface that will consume the bytes from the `BytesReader` to yield a
* `header()` and either `blocks()` or `blocksIndex()` data.
*
* @param {Uint8Array} bytes
* @returns {{ header : CarHeader | CarV2Header , blocks: Block[]}}
*/
export function fromBytes (bytes) {
hugomrdias marked this conversation as resolved.
Show resolved Hide resolved
let reader = bytesReader(bytes)
const header = readHeader(reader)
if (header.version === 2) {
const v1length = reader.pos - header.dataOffset
reader = limitReader(reader, header.dataSize - v1length)
}

const blocks = []
while (reader.upTo(8).length > 0) {
const { cid, blockLength } = readBlockHead(reader)

blocks.push({ cid, bytes: reader.exactly(blockLength, true) })
}

return {
header, blocks
}
}

/**
* Creates a `BytesReader` from a `Uint8Array`.
*
* @name decoder.bytesReader(bytes)
* @param {Uint8Array} bytes
* @returns {BytesReaderSync}
*/
export function bytesReader (bytes) {
let pos = 0

/** @type {BytesReaderSync} */
return {
upTo (length) {
return bytes.subarray(pos, pos + Math.min(length, bytes.length - pos))
/* c8 ignore next 2 */
// Node.js 12 c8 bug
},

exactly (length, seek = false) {
if (length > bytes.length - pos) {
throw new Error('Unexpected end of data')
}

const out = bytes.subarray(pos, pos + length)
if (seek) {
pos += length
}
return out
/* c8 ignore next 2 */
// Node.js 12 c8 bug
},

seek (length) {
pos += length
},

get pos () {
return pos
}
}
}

/**
* Wraps a `BytesReader` in a limiting `BytesReader` which limits maximum read
* to `byteLimit` bytes. It _does not_ update `pos` of the original
* `BytesReader`.
*
* @name decoder.limitReader(reader, byteLimit)
* @param {BytesReaderSync} reader
* @param {number} byteLimit
* @returns {BytesReaderSync}
*/
export function limitReader (reader, byteLimit) {
let bytesRead = 0

/** @type {BytesReaderSync} */
return {
upTo (length) {
let bytes = reader.upTo(length)
if (bytes.length + bytesRead > byteLimit) {
bytes = bytes.subarray(0, byteLimit - bytesRead)
}
return bytes
/* c8 ignore next 2 */
// Node.js 12 c8 bug
},

exactly (length, seek = false) {
const bytes = reader.exactly(length, seek)
if (bytes.length + bytesRead > byteLimit) {
throw new Error('Unexpected end of data')
}
if (seek) {
bytesRead += length
}
return bytes
/* c8 ignore next 2 */
// Node.js 12 c8 bug
},

seek (length) {
bytesRead += length
reader.seek(length)
},

get pos () {
return reader.pos
}
}
}
Loading