Skip to content

Commit

Permalink
feat: deny incoming connections and add allow/deny lists (#1398)
Browse files Browse the repository at this point in the history
When we reach our connection limit, deny incoming connections before the peer ids are exchanged instead of accepting them and trying to apply the limits after the exchange.

Adds a allow and deny lists of multiaddr prefixes to ensure we can always accept connections from a given network host even when we have reached out connection limit.

Outgoing behaviour remains the same, that is, you can exceed the limit while opening new connections and the connection manager will try to close 'low value' connections after the new connection has been made.

Co-authored-by: Marin Petrunić <[email protected]>
  • Loading branch information
achingbrain and mpetrunic authored Oct 6, 2022
1 parent ca30192 commit c185ef5
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export default {
// Use the last peer
const peerId = await createFromJSON(Peers[Peers.length - 1])
const libp2p = await createLibp2p({
connectionManager: {
inboundConnectionThreshold: Infinity
},
addresses: {
listen: [MULTIADDRS_WEBSOCKETS[0]]
},
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
"p-settle": "^5.0.0",
"private-ip": "^2.3.3",
"protons-runtime": "^3.0.1",
"rate-limiter-flexible": "^2.3.11",
"retimer": "^3.0.0",
"sanitize-filename": "^1.6.3",
"set-delayed-interval": "^1.0.0",
Expand Down
81 changes: 71 additions & 10 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import { Components, Initializable } from '@libp2p/components'
import * as STATUS from '@libp2p/interface-connection/status'
import type { AddressSorter } from '@libp2p/interface-peer-store'
import type { Resolver } from '@multiformats/multiaddr'
import { multiaddr, Multiaddr, Resolver } from '@multiformats/multiaddr'
import { PeerMap } from '@libp2p/peer-collections'
import { TimeoutController } from 'timeout-abort-controller'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'
import { RateLimiterMemory } from 'rate-limiter-flexible'

const log = logger('libp2p:connection-manager')

Expand All @@ -31,7 +32,8 @@ const defaultOptions: Partial<ConnectionManagerInit> = {
maxEventLoopDelay: Infinity,
pollInterval: 2000,
autoDialInterval: 10000,
movingAverageInterval: 60000
movingAverageInterval: 60000,
inboundConnectionThreshold: 5
}

const METRICS_SYSTEM = 'libp2p'
Expand Down Expand Up @@ -132,6 +134,24 @@ export interface ConnectionManagerInit {
* tagged with KEEP_ALIVE up to this timeout in ms. (default: 60000)
*/
startupReconnectTimeout?: number

/**
* A list of multiaddrs that will always be allowed (except if they are in the
* deny list) to open connections to this node even if we've reached maxConnections
*/
allow?: string[]

/**
* A list of multiaddrs that will never be allowed to open connections to
* this node under any circumstances
*/
deny?: string[]

/**
* If more than this many connections are opened per second by a single
* host, reject subsequent connections
*/
inboundConnectionThreshold?: number
}

export interface ConnectionManagerEvents {
Expand All @@ -152,6 +172,9 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
private readonly startupReconnectTimeout: number
private connectOnStartupController?: TimeoutController
private readonly dialTimeout: number
private readonly allow: Multiaddr[]
private readonly deny: Multiaddr[]
private readonly inboundConnectionRateLimiter: RateLimiterMemory

constructor (init: ConnectionManagerInit) {
super()
Expand Down Expand Up @@ -187,6 +210,14 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven

this.startupReconnectTimeout = init.startupReconnectTimeout ?? STARTUP_RECONNECT_TIMEOUT
this.dialTimeout = init.dialTimeout ?? 30000

this.allow = (init.allow ?? []).map(ma => multiaddr(ma))
this.deny = (init.deny ?? []).map(ma => multiaddr(ma))

this.inboundConnectionRateLimiter = new RateLimiterMemory({
points: this.opts.inboundConnectionThreshold,
duration: 1
})
}

init (components: Components): void {
Expand Down Expand Up @@ -598,21 +629,16 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
log.trace('checking limit of %s. current value: %d of %d', name, value, limit)
if (value > limit) {
log('%s: limit exceeded: %p, %d/%d, pruning %d connection(s)', this.components.getPeerId(), name, value, limit, toPrune)
await this._maybePruneConnections(toPrune)
await this._pruneConnections(toPrune)
}
}

/**
* If we have more connections than our maximum, select some excess connections
* to prune based on peer value
*/
async _maybePruneConnections (toPrune: number) {
async _pruneConnections (toPrune: number) {
const connections = this.getConnections()

if (connections.length <= this.opts.minConnections || toPrune < 1) {
return
}

const peerValues = new PeerMap<number>()

// work out peer values
Expand Down Expand Up @@ -677,6 +703,41 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

async acceptIncomingConnection (maConn: MultiaddrConnection): Promise<boolean> {
return true
// check deny list
const denyConnection = this.deny.some(ma => {
return maConn.remoteAddr.toString().startsWith(ma.toString())
})

if (denyConnection) {
log('connection from %s refused - connection remote address was in deny list', maConn.remoteAddr)
return false
}

// check allow list
const allowConnection = this.allow.some(ma => {
return maConn.remoteAddr.toString().startsWith(ma.toString())
})

if (allowConnection) {
return true
}

if (maConn.remoteAddr.isThinWaistAddress()) {
const host = maConn.remoteAddr.nodeAddress().address

try {
await this.inboundConnectionRateLimiter.consume(host, 1)
} catch {
log('connection from %s refused - inboundConnectionThreshold exceeded by host %s', host, maConn.remoteAddr)
return false
}
}

if (this.getConnections().length < this.opts.maxConnections) {
return true
}

log('connection from %s refused - maxConnections exceeded', maConn.remoteAddr)
return false
}
}
3 changes: 2 additions & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ export enum codes {
ERR_ALREADY_SUCCEEDED = 'ERR_ALREADY_SUCCEEDED',
ERR_NO_HANDLER_FOR_PROTOCOL = 'ERR_NO_HANDLER_FOR_PROTOCOL',
ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS',
ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS'
ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS',
ERR_CONNECTION_DENIED = 'ERR_CONNECTION_DENIED'
}
7 changes: 7 additions & 0 deletions src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
* Upgrades an inbound connection
*/
async upgradeInbound (maConn: MultiaddrConnection): Promise<Connection> {
const accept = await this.components.getConnectionManager().acceptIncomingConnection(maConn)

if (!accept) {
await maConn.close()
throw errCode(new Error('connection denied'), codes.ERR_CONNECTION_DENIED)
}

let encryptedConn
let remotePeer
let upgradedConn: Duplex<Uint8Array>
Expand Down
139 changes: 136 additions & 3 deletions test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,24 @@ import sinon from 'sinon'
import { createNode } from '../utils/creators/peer.js'
import { createBaseOptions } from '../utils/base-options.browser.js'
import type { Libp2pNode } from '../../src/libp2p.js'
import type { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-mocks'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { CustomEvent } from '@libp2p/interfaces/events'
import { KEEP_ALIVE } from '@libp2p/interface-peer-store/tags'
import pWaitFor from 'p-wait-for'
import { multiaddr } from '@multiformats/multiaddr'
import { Components } from '@libp2p/components'
import { stubInterface } from 'ts-sinon'
import type { Dialer } from '@libp2p/interface-connection-manager'
import type { Connection } from '@libp2p/interface-connection'

const defaultOptions = {
maxConnections: 10,
minConnections: 1,
autoDialInterval: Infinity,
inboundUpgradeTimeout: 10000
}

describe('Connection Manager', () => {
let libp2p: Libp2pNode
Expand Down Expand Up @@ -68,7 +80,7 @@ describe('Connection Manager', () => {
await libp2p.start()

const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections')
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_pruneConnections')
const spies = new Map<number, sinon.SinonSpy<[], Promise<void>>>()

// Add 1 connection too many
Expand Down Expand Up @@ -118,7 +130,7 @@ describe('Connection Manager', () => {
await libp2p.start()

const connectionManager = libp2p.components.getConnectionManager() as DefaultConnectionManager
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_maybePruneConnections')
const connectionManagerMaybeDisconnectOneSpy = sinon.spy(connectionManager, '_pruneConnections')

// Add 1 too many connections
const spy = sinon.spy()
Expand Down Expand Up @@ -171,4 +183,125 @@ describe('Connection Manager', () => {
expect(connectionManagerOpenConnectionSpy.called).to.be.true('Did not attempt to connect to important peer')
expect(connectionManagerOpenConnectionSpy.getCall(0).args[0].toString()).to.equal(peerId.toString(), 'Attempted to connect to the wrong peer')
})

it('should deny connections from denylist multiaddrs', async () => {
const remoteAddr = multiaddr('/ip4/83.13.55.32/tcp/59283')
const connectionManager = new DefaultConnectionManager({
...defaultOptions,
deny: [
'/ip4/83.13.55.32'
]
})

const remotePeer = await createEd25519PeerId()
const maConn = mockMultiaddrConnection({
remoteAddr,
source: [],
sink: async () => {}
}, remotePeer)

await expect(connectionManager.acceptIncomingConnection(maConn))
.to.eventually.be.false()
})

it('should deny connections when maxConnections is exceeded', async () => {
const connectionManager = new DefaultConnectionManager({
...defaultOptions,
maxConnections: 1
})

const dialer = stubInterface<Dialer>()
dialer.dial.resolves(stubInterface<Connection>())

const components = new Components({
dialer
})

// set mocks
connectionManager.init(components)

// max out the connection limit
await connectionManager.openConnection(await createEd25519PeerId())
expect(connectionManager.getConnections()).to.have.lengthOf(1)

// an inbound connection is opened
const remotePeer = await createEd25519PeerId()
const maConn = mockMultiaddrConnection({
source: [],
sink: async () => {}
}, remotePeer)

await expect(connectionManager.acceptIncomingConnection(maConn))
.to.eventually.be.false()
})

it('should deny connections from peers that connect too frequently', async () => {
const connectionManager = new DefaultConnectionManager({
...defaultOptions,
inboundConnectionThreshold: 1
})

const dialer = stubInterface<Dialer>()
dialer.dial.resolves(stubInterface<Connection>())

const components = new Components({
dialer
})

// set mocks
connectionManager.init(components)

// an inbound connection is opened
const remotePeer = await createEd25519PeerId()
const maConn = mockMultiaddrConnection({
source: [],
sink: async () => {},
// has to be thin waist, which it will be since we've not done the peer id handshake
// yet in the code being exercised by this test
remoteAddr: multiaddr('/ip4/34.4.63.125/tcp/4001')
}, remotePeer)

await expect(connectionManager.acceptIncomingConnection(maConn))
.to.eventually.be.true()

// connect again within a second
await expect(connectionManager.acceptIncomingConnection(maConn))
.to.eventually.be.false()
})

it('should allow connections from allowlist multiaddrs', async () => {
const remoteAddr = multiaddr('/ip4/83.13.55.32/tcp/59283')
const connectionManager = new DefaultConnectionManager({
...defaultOptions,
maxConnections: 1,
allow: [
'/ip4/83.13.55.32'
]
})

const dialer = stubInterface<Dialer>()
dialer.dial.resolves(stubInterface<Connection>())

const components = new Components({
dialer
})

// set mocks
connectionManager.init(components)

// max out the connection limit
await connectionManager.openConnection(await createEd25519PeerId())
expect(connectionManager.getConnections()).to.have.lengthOf(1)

// an inbound connection is opened from an address in the allow list
const remotePeer = await createEd25519PeerId()
const maConn = mockMultiaddrConnection({
remoteAddr,
source: [],
sink: async () => {}
}, remotePeer)

await expect(connectionManager.acceptIncomingConnection(maConn))
.to.eventually.be.true()
})
})
8 changes: 5 additions & 3 deletions test/upgrading/upgrader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import swarmKey from '../fixtures/swarm.key.js'
import { DefaultUpgrader } from '../../src/upgrader.js'
import { codes } from '../../src/errors.js'
import { mockConnectionGater, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-mocks'
import { mockConnectionGater, mockConnectionManager, mockMultiaddrConnPair, mockRegistrar, mockStream } from '@libp2p/interface-mocks'
import Peers from '../fixtures/peers.js'
import type { Upgrader } from '@libp2p/interface-transport'
import type { PeerId } from '@libp2p/interface-peer-id'
Expand Down Expand Up @@ -61,7 +61,8 @@ describe('Upgrader', () => {
connectionGater: mockConnectionGater(),
registrar: mockRegistrar(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore()
datastore: new MemoryDatastore(),
connectionManager: mockConnectionManager()
})
localMuxerFactory = new Mplex()
localUpgrader = new DefaultUpgrader(localComponents, {
Expand All @@ -79,7 +80,8 @@ describe('Upgrader', () => {
connectionGater: mockConnectionGater(),
registrar: mockRegistrar(),
peerStore: new PersistentPeerStore(),
datastore: new MemoryDatastore()
datastore: new MemoryDatastore(),
connectionManager: mockConnectionManager()
})
remoteUpgrader = new DefaultUpgrader(remoteComponents, {
connectionEncryption: [
Expand Down

0 comments on commit c185ef5

Please sign in to comment.