Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

fix: simplify transport interface, update interfaces for use with libp2p #180

Merged
merged 3 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/libp2p-connection/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@
"test:electron-main": "npm run test -- -t electron-main"
},
"dependencies": {
"@libp2p/interfaces": "^1.3.12",
"@libp2p/interfaces": "^1.3.0",
"@multiformats/multiaddr": "^10.1.5",
"err-code": "^3.0.1"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.1.14",
"@libp2p/peer-id-factory": "^1.0.6",
"@libp2p/interface-compliance-tests": "^1.1.0",
"@libp2p/peer-id-factory": "^1.0.0",
"aegir": "^36.1.3",
"it-pair": "^2.0.2"
}
Expand Down
47 changes: 18 additions & 29 deletions packages/libp2p-connection/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import type { Multiaddr } from '@multiformats/multiaddr'
import errCode from 'err-code'
import { OPEN, CLOSING, CLOSED } from '@libp2p/interfaces/connection/status'
import type { ConnectionStat, Metadata, ProtocolStream, Stream } from '@libp2p/interfaces/connection'
import { symbol } from '@libp2p/interfaces/connection'
import type { Connection, ConnectionStat, Metadata, ProtocolStream, Stream } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interfaces/peer-id'

const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')

interface ConnectionInit {
localAddr: Multiaddr
remoteAddr: Multiaddr
localPeer: PeerId
remotePeer: PeerId
newStream: (protocols: string[]) => Promise<ProtocolStream>
close: () => Promise<void>
Expand All @@ -21,23 +18,15 @@ interface ConnectionInit {
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
*/
export class Connection {
export class ConnectionImpl implements Connection {
/**
* Connection identifier.
*/
public readonly id: string
/**
* Observed multiaddr of the local peer
*/
public readonly localAddr: Multiaddr
/**
* Observed multiaddr of the remote peer
*/
public readonly remoteAddr: Multiaddr
/**
* Local peer id
*/
public readonly localPeer: PeerId
/**
* Remote peer id
*/
Expand Down Expand Up @@ -75,12 +64,10 @@ export class Connection {
* Any libp2p transport should use an upgrader to return this connection.
*/
constructor (init: ConnectionInit) {
const { localAddr, remoteAddr, localPeer, remotePeer, newStream, close, getStreams, stat } = init
const { remoteAddr, remotePeer, newStream, close, getStreams, stat } = init

this.id = `${(parseInt(String(Math.random() * 1e9))).toString(36)}${Date.now()}`
this.localAddr = localAddr
this.remoteAddr = remoteAddr
this.localPeer = localPeer
this.remotePeer = remotePeer
this.stat = {
...stat,
Expand All @@ -98,17 +85,10 @@ export class Connection {
return 'Connection'
}

get [connectionSymbol] () {
get [symbol] () {
return true
}

/**
* Checks if the given value is a `Connection` instance
*/
static isConnection (other: any) {
return Boolean(connectionSymbol in other)
}

/**
* Get all the streams of the muxer
*/
Expand All @@ -119,7 +99,7 @@ export class Connection {
/**
* Create a new stream from this connection
*/
async newStream (protocols: string[]) {
async newStream (protocols: string | string[]) {
if (this.stat.status === CLOSING) {
throw errCode(new Error('the connection is being closed'), 'ERR_CONNECTION_BEING_CLOSED')
}
Expand All @@ -128,7 +108,9 @@ export class Connection {
throw errCode(new Error('the connection is closed'), 'ERR_CONNECTION_CLOSED')
}

if (!Array.isArray(protocols)) protocols = [protocols]
if (!Array.isArray(protocols)) {
protocols = [protocols]
}

const { stream, protocol } = await this._newStream(protocols)

Expand All @@ -143,9 +125,12 @@ export class Connection {
/**
* Add a stream when it is opened to the registry
*/
addStream (stream: Stream, metadata: Metadata) {
addStream (stream: Stream, metadata: Partial<Metadata> = {}) {
// Add metadata for the stream
this.registry.set(stream.id, metadata)
this.registry.set(stream.id, {
protocol: metadata.protocol ?? '',
metadata: metadata.metadata ?? {}
})
}

/**
Expand Down Expand Up @@ -174,3 +159,7 @@ export class Connection {
this.stat.status = CLOSED
}
}

export function createConnection (init: ConnectionInit): Connection {
return new ConnectionImpl(init)
}
12 changes: 3 additions & 9 deletions packages/libp2p-connection/test/compliance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import tests from '@libp2p/interface-compliance-tests/connection'
import { Connection } from '../src/index.js'
import { createConnection } from '../src/index.js'
import peers from '@libp2p/interface-compliance-tests/utils/peers'
import * as PeerIdFactory from '@libp2p/peer-id-factory'
import { Multiaddr } from '@multiformats/multiaddr'
Expand All @@ -13,19 +13,13 @@ describe('compliance tests', () => {
* certain values for testing.
*/
async setup (properties) {
const localAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8080')
const remoteAddr = new Multiaddr('/ip4/127.0.0.1/tcp/8081')
const [localPeer, remotePeer] = await Promise.all([
PeerIdFactory.createFromJSON(peers[0]),
PeerIdFactory.createFromJSON(peers[1])
])
const remotePeer = await PeerIdFactory.createFromJSON(peers[0])
const openStreams: Stream[] = []
let streamId = 0

const connection = new Connection({
localPeer,
const connection = createConnection({
remotePeer,
localAddr,
remoteAddr,
stat: {
timeline: {
Expand Down
7 changes: 2 additions & 5 deletions packages/libp2p-connection/test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Connection } from '../src/index.js'
import { createConnection } from '../src/index.js'
import * as PeerIdFactory from '@libp2p/peer-id-factory'
import { pair } from 'it-pair'
import { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -32,15 +32,12 @@ const peers = [{

describe('connection tests', () => {
it('should not require local or remote addrs', async () => {
const localPeer = await PeerIdFactory.createFromJSON(peers[0])
const remotePeer = await PeerIdFactory.createFromJSON(peers[1])

const openStreams: any[] = []
let streamId = 0

return new Connection({
localPeer,
localAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4001'),
return createConnection({
remotePeer,
remoteAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4002'),
stat: {
Expand Down
14 changes: 7 additions & 7 deletions packages/libp2p-interface-compliance-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,13 @@
"test:electron-main": "npm run test -- -t electron-main"
},
"dependencies": {
"@libp2p/crypto": "^0.22.7",
"@libp2p/interfaces": "^1.3.12",
"@libp2p/crypto": "^0.22.8",
"@libp2p/interfaces": "^1.3.0",
"@libp2p/logger": "^1.1.0",
"@libp2p/multistream-select": "^1.0.1",
"@libp2p/peer-id": "^1.1.6",
"@libp2p/peer-id-factory": "^1.0.6",
"@libp2p/pubsub": "^1.2.8",
"@libp2p/multistream-select": "^1.0.0",
"@libp2p/peer-id": "^1.1.0",
"@libp2p/peer-id-factory": "^1.0.0",
"@libp2p/pubsub": "^1.2.0",
"@multiformats/multiaddr": "^10.1.5",
"abortable-iterator": "^4.0.2",
"aegir": "^36.1.3",
Expand All @@ -216,7 +216,7 @@
"it-goodbye": "^4.0.1",
"it-map": "^1.0.6",
"it-ndjson": "^0.1.1",
"it-pair": "^2.0.0",
"it-pair": "^2.0.2",
"it-pipe": "^2.0.3",
"it-pushable": "^2.0.1",
"it-stream-types": "^1.0.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import peers from '../utils/peers.js'
import { UnexpectedPeerError } from '@libp2p/interfaces/connection-encrypter/errors'
import { createMaConnPair } from './utils/index.js'
import type { TestSetup } from '../index.js'
import type { Encrypter } from '@libp2p/interfaces/connection-encrypter'
import type { ConnectionEncrypter } from '@libp2p/interfaces/connection-encrypter'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { Source } from 'it-stream-types'

export default (common: TestSetup<Encrypter>) => {
export default (common: TestSetup<ConnectionEncrypter>) => {
describe('interface-connection-encrypter compliance tests', () => {
let crypto: Encrypter
let crypto: ConnectionEncrypter
let localPeer: PeerId
let remotePeer: PeerId
let mitmPeer: PeerId
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { peerIdFromBytes } from '@libp2p/peer-id'
import { handshake } from 'it-handshake'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { UnexpectedPeerError } from '@libp2p/interfaces/connection-encrypter/errors'
import { Multiaddr } from '@multiformats/multiaddr'
import type { ConnectionEncrypter } from '@libp2p/interfaces/connection-encrypter'
import type { Transform, Source } from 'it-stream-types'

// A basic transform that does nothing to the data
const transform = (): Transform<Uint8Array, Uint8Array> => {
return (source: Source<Uint8Array>) => (async function * () {
for await (const chunk of source) {
yield chunk
}
})()
}

export function mockConnectionEncrypter () {
const encrypter: ConnectionEncrypter = {
protocol: 'insecure',
secureInbound: async (localPeer, duplex, expectedPeer) => {
// 1. Perform a basic handshake.
const shake = handshake(duplex)
shake.write(localPeer.toBytes())
const remoteId = await shake.read()

if (remoteId == null) {
throw new Error('Could not read remote ID')
}

const remotePeer = peerIdFromBytes(remoteId.slice())
shake.rest()

if (expectedPeer != null && !expectedPeer.equals(remotePeer)) {
throw new UnexpectedPeerError()
}

// 2. Create your encryption box/unbox wrapper
const wrapper = duplexPair<Uint8Array>()
const encrypt = transform() // Use transform iterables to modify data
const decrypt = transform()

void pipe(
wrapper[0], // We write to wrapper
encrypt, // The data is encrypted
shake.stream, // It goes to the remote peer
decrypt, // Decrypt the incoming data
wrapper[0] // Pipe to the wrapper
)

return {
conn: {
...wrapper[1],
close: async () => {},
localAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4001'),
remoteAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4002'),
timeline: {
open: Date.now()
},
conn: true
},
remotePeer,
remoteEarlyData: new Uint8Array(0)
}
},
secureOutbound: async (localPeer, duplex, remotePeer) => {
// 1. Perform a basic handshake.
const shake = handshake(duplex)
shake.write(localPeer.toBytes())
const remoteId = await shake.read()

if (remoteId == null) {
throw new Error('Could not read remote ID')
}

shake.rest()

// 2. Create your encryption box/unbox wrapper
const wrapper = duplexPair<Uint8Array>()
const encrypt = transform()
const decrypt = transform()

void pipe(
wrapper[0], // We write to wrapper
encrypt, // The data is encrypted
shake.stream, // It goes to the remote peer
decrypt, // Decrypt the incoming data
wrapper[0] // Pipe to the wrapper
)

return {
conn: {
...wrapper[1],
close: async () => {},
localAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4001'),
remoteAddr: new Multiaddr('/ip4/127.0.0.1/tcp/4002'),
timeline: {
open: Date.now()
},
conn: true
},
remotePeer: peerIdFromBytes(remoteId.slice()),
remoteEarlyData: new Uint8Array(0)
}
}
}

return encrypter
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import { EventEmitter } from '@libp2p/interfaces'
import type { Connection } from '@libp2p/interfaces/src/connection'
import type { PeerId } from '@libp2p/interfaces/src/peer-id'
import type { ConnectionManager, ConnectionManagerEvents } from '@libp2p/interfaces/src/registrar'
import type { Connection } from '@libp2p/interfaces/connection'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import type { ConnectionManager, ConnectionManagerEvents } from '@libp2p/interfaces/registrar'

class MockConnectionManager extends EventEmitter<ConnectionManagerEvents> implements ConnectionManager {
getConnectionMap (): Map<string, Connection[]> {
throw new Error('Method not implemented.')
}

getConnectionList (): Connection[] {
throw new Error('Method not implemented.')
}

getConnections (): Connection[] {
throw new Error('Method not implemented.')
}

getConnection (peerId: PeerId): Connection | undefined {
throw new Error('Method not implemented.')
}
Expand Down
Loading