Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update libp2p-pubsub #110

Merged
merged 7 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@
"debug": "^4.1.1",
"denque": "^1.4.1",
"err-code": "^2.0.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.0.1",
"libp2p-pubsub": "~0.5.3",
"p-map": "^4.0.0",
"libp2p-pubsub": "https://github.com/libp2p/js-libp2p-pubsub#v0.6.x",
"peer-id": "~0.13.12",
"protons": "^1.0.1",
"time-cache": "^0.3.0"
Expand All @@ -70,7 +68,7 @@
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^1.0.0",
"libp2p": "https://github.com/libp2p/js-libp2p#0.29.x",
"libp2p-floodsub": "^0.21.0",
"libp2p-floodsub": "https://github.com/chainsafe/js-libp2p-floodsub#chore/update-pubsub",
"libp2p-mplex": "^0.9.5",
"libp2p-noise": "^1.1.2",
"libp2p-websockets": "^0.13.6",
Expand Down
43 changes: 7 additions & 36 deletions test/2-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,16 @@ describe('2 nodes', () => {
new Promise(resolve => nodes[1].once('pubsub:subscription-change', (...args) => resolve(args)))
])

const [changedPeerId, changedTopics, changedSubs] = evt0
const [changedPeerId, changedSubs] = evt0

expectSet(nodes[0].subscriptions, [topic])
expectSet(nodes[1].subscriptions, [topic])
expect(nodes[0].peers.size).to.equal(1)
expect(nodes[1].peers.size).to.equal(1)
expectSet(first(nodes[0].peers).topics, [topic])
expectSet(first(nodes[1].peers).topics, [topic])
expectSet(nodes[0].topics.get(topic), [nodes[1].peerId.toB58String()])
expectSet(nodes[1].topics.get(topic), [nodes[0].peerId.toB58String()])

expect(changedPeerId.toB58String()).to.equal(first(nodes[0].peers).id.toB58String())
expectSet(changedTopics, [topic])
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])

// await heartbeats
Expand Down Expand Up @@ -197,33 +196,6 @@ describe('2 nodes', () => {
nodes[1].publish(topic, Buffer.from('banana'))
})
})

it('Publish 10 msg to a topic as array', (done) => {
let counter = 0

nodes[1].once(topic, shouldNotHappen)

nodes[0].on(topic, receivedMsg)

function receivedMsg (msg) {
expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(nodes[1].peerId.toB58String())
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
expect(msg.topicIDs).to.be.eql([topic])

if (++counter === 10) {
nodes[0].removeListener(topic, receivedMsg)
nodes[1].removeListener(topic, shouldNotHappen)
done()
}
}

const msgs = []
Array.from({ length: 10 }).forEach(() => {
msgs.push(Buffer.from('banana'))
})
nodes[1].publish(topic, msgs)
})
})

describe('publish after unsubscribe', () => {
Expand Down Expand Up @@ -254,15 +226,14 @@ describe('2 nodes', () => {
nodes[0].unsubscribe(topic)
expect(nodes[0].subscriptions.size).to.equal(0)

const [changedPeerId, changedTopics, changedSubs] = await new Promise((resolve) => {
const [changedPeerId, changedSubs] = await new Promise((resolve) => {
nodes[1].once('pubsub:subscription-change', (...args) => resolve(args))
})
await new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))

expect(nodes[1].peers.size).to.equal(1)
expectSet(first(nodes[1].peers).topics, [])
expectSet(nodes[1].topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(nodes[1].peers).id.toB58String())
expectSet(changedTopics, [])
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
})

Expand Down Expand Up @@ -324,11 +295,11 @@ describe('2 nodes', () => {

expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(1)
expectSet(first(nodes[1].peers).topics, ['Za'])
expectSet(nodes[1].topics.get('Za'), [nodes[0].peerId.toB58String()])

expectSet(nodes[1].subscriptions, ['Zb'])
expect(nodes[0].peers.size).to.equal(1)
expectSet(first(nodes[0].peers).topics, ['Zb'])
expectSet(nodes[0].topics.get('Zb'), [nodes[1].peerId.toB58String()])
})
})

Expand Down
14 changes: 6 additions & 8 deletions test/floodsub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ describe('gossipsub fallbacks to floodsub', () => {
nodeFs.subscribe(topic)

// await subscription change
const [changedPeerId, changedTopics, changedSubs] = await new Promise((resolve) => {
const [changedPeerId, changedSubs] = await new Promise((resolve) => {
nodeGs.once('pubsub:subscription-change', (...args) => resolve(args))
})
await delay(1000)
Expand All @@ -126,11 +126,10 @@ describe('gossipsub fallbacks to floodsub', () => {
expectSet(nodeFs.subscriptions, [topic])
expect(nodeGs.peers.size).to.equal(1)
expect(nodeFs.peers.size).to.equal(1)
expectSet(first(nodeGs.peers).topics, [topic])
expectSet(first(nodeFs.peers).topics, [topic])
expectSet(nodeGs.topics.get(topic), [nodeFs.peerId.toB58String()])
expectSet(nodeFs.topics.get(topic), [nodeGs.peerId.toB58String()])

expect(changedPeerId.toB58String()).to.equal(first(nodeGs.peers).id.toB58String())
expectSet(changedTopics, [topic])
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])
})
})
Expand Down Expand Up @@ -249,7 +248,7 @@ describe('gossipsub fallbacks to floodsub', () => {

const msgs = []
times(10, (index) => msgs.push(Buffer.from('banana ' + index)))
nodeGs.publish(topic, msgs)
msgs.forEach(msg => nodeGs.publish(topic, msg))
})
})

Expand Down Expand Up @@ -293,14 +292,13 @@ describe('gossipsub fallbacks to floodsub', () => {
nodeGs.unsubscribe(topic)
expect(nodeGs.subscriptions.size).to.equal(0)

const [changedPeerId, changedTopics, changedSubs] = await new Promise((resolve) => {
const [changedPeerId, changedSubs] = await new Promise((resolve) => {
nodeFs.once('floodsub:subscription-change', (...args) => resolve(args))
})

expect(nodeFs.peers.size).to.equal(1)
expectSet(first(nodeFs.peers).topics, [])
expectSet(nodeFs.topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(nodeFs.peers).id.toB58String())
expectSet(changedTopics, [])
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
})

Expand Down
42 changes: 3 additions & 39 deletions test/multiple-nodes.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,9 @@ describe('multiple nodes (more than 2)', () => {
const bPeerId = b.peerId.toB58String()
const cPeerId = c.peerId.toB58String()

expectSet(a.peers.get(bPeerId).topics, [topic])
expectSet(b.peers.get(aPeerId).topics, [topic])
expectSet(b.peers.get(cPeerId).topics, [topic])
expectSet(c.peers.get(bPeerId).topics, [topic])
expectSet(a.topics.get(topic), [bPeerId])
expectSet(b.topics.get(topic), [cPeerId, aPeerId])
expectSet(c.topics.get(topic), [bPeerId])

expect(a.mesh.get(topic).size).to.equal(1)
expect(b.mesh.get(topic).size).to.equal(2)
Expand Down Expand Up @@ -119,41 +118,6 @@ describe('multiple nodes (more than 2)', () => {
expect(msgB.data.toString()).to.equal('hey')
expect(msgC.data.toString()).to.equal('hey')
})

it('publish array on node a', async function () {
this.timeout(10000)
let msgB = new Promise((resolve) => {
const output = []
b.on('Z', (msg) => {
output.push(msg)
if (output.length === 2) {
b.removeAllListeners('Z')
resolve(output)
}
})
})
let msgC = new Promise((resolve) => {
const output = []
c.on('Z', (msg) => {
output.push(msg)
if (output.length === 2) {
c.removeAllListeners('Z')
resolve(output)
}
})
})

a.publish('Z', [Buffer.from('hey'), Buffer.from('hey')])
msgB = await msgB
msgC = await msgC

expect(msgB.length).to.equal(2)
expect(msgB[0].data.toString()).to.equal('hey')
expect(msgB[1].data.toString()).to.equal('hey')
expect(msgC.length).to.equal(2)
expect(msgC[0].data.toString()).to.equal('hey')
expect(msgC[1].data.toString()).to.equal('hey')
})
})
})

Expand Down
68 changes: 42 additions & 26 deletions test/peerScore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ describe('PeerScore', () => {
const nMessages = 100
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
ps.validateMessage(peerA, msg)
ps.deliverMessage(peerA, msg)
msg.receivedFrom = peerA
ps.validateMessage(msg)
ps.deliverMessage(msg)
}

ps._refreshScores()
Expand Down Expand Up @@ -137,8 +138,9 @@ describe('PeerScore', () => {
const nMessages = 100
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
ps.validateMessage(peerA, msg)
ps.deliverMessage(peerA, msg)
msg.receivedFrom = peerA
ps.validateMessage(msg)
ps.deliverMessage(msg)
}

ps._refreshScores()
Expand Down Expand Up @@ -176,8 +178,9 @@ describe('PeerScore', () => {
const nMessages = 100
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
ps.validateMessage(peerA, msg)
ps.deliverMessage(peerA, msg)
msg.receivedFrom = peerA
ps.validateMessage(msg)
ps.deliverMessage(msg)
}

ps._refreshScores()
Expand Down Expand Up @@ -244,15 +247,17 @@ describe('PeerScore', () => {
const nMessages = 100
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
msg.receivedFrom = peerA
ps.validateMessage(msg)
ps.deliverMessage(msg)

ps.validateMessage(peerA, msg)
ps.deliverMessage(peerA, msg)

ps.duplicateMessage(peerB, msg)
msg.receivedFrom = peerB
ps.duplicateMessage(msg)

// deliver duplicate from peer C after the window
await delay(tparams.meshMessageDeliveriesWindow + 5)
ps.duplicateMessage(peerC, msg)
msg.receivedFrom = peerC
ps.duplicateMessage(msg)
}
ps._refreshScores()
const aScore = ps.score(peerA)
Expand Down Expand Up @@ -298,8 +303,9 @@ describe('PeerScore', () => {
const nMessages = 40
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
ps.validateMessage(peerA, msg)
ps.deliverMessage(peerA, msg)
msg.receivedFrom = peerA
ps.validateMessage(msg)
ps.deliverMessage(msg)
}
ps._refreshScores()
let aScore = ps.score(peerA)
Expand Down Expand Up @@ -361,8 +367,9 @@ describe('PeerScore', () => {
const nMessages = 100
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
ps.validateMessage(peerA, msg)
ps.deliverMessage(peerA, msg)
msg.receivedFrom = peerA
ps.validateMessage(msg)
ps.deliverMessage(msg)
}
// peers A and B should both have zero scores, since the failure penalty hasn't been applied yet
ps._refreshScores()
Expand Down Expand Up @@ -404,7 +411,8 @@ describe('PeerScore', () => {
const nMessages = 100
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
ps.rejectMessage(peerA, msg)
msg.receivedFrom = peerA
ps.rejectMessage(msg)
}
ps._refreshScores()
let aScore = ps.score(peerA)
Expand Down Expand Up @@ -432,7 +440,8 @@ describe('PeerScore', () => {
const nMessages = 100
for (let i = 0; i < nMessages; i++) {
const msg = makeTestMessage(i, [mytopic])
ps.rejectMessage(peerA, msg)
msg.receivedFrom = peerA
ps.rejectMessage(msg)
}
ps._refreshScores()
let aScore = ps.score(peerA)
Expand Down Expand Up @@ -466,13 +475,15 @@ describe('PeerScore', () => {
ps.addPeer(peerB)

const msg = makeTestMessage(0, [mytopic])
msg.receivedFrom = peerA

// insert a record
ps.validateMessage(peerA, msg)
ps.validateMessage(msg)

// this should have no effect in the score, and subsequent duplicate messages should have no effect either
ps.ignoreMessage(peerA, msg)
ps.duplicateMessage(peerB, msg)
ps.ignoreMessage(msg)
msg.receivedFrom = peerB
ps.duplicateMessage(msg)

let aScore = ps.score(peerA)
let bScore = ps.score(peerB)
Expand All @@ -486,11 +497,13 @@ describe('PeerScore', () => {
ps.deliveryRecords.gc()

// insert a new record in the message deliveries
ps.validateMessage(peerA, msg)
msg.receivedFrom = peerA
ps.validateMessage(msg)

// and reject the message to make sure duplicates are also penalized
ps.rejectMessage(peerA, msg)
ps.duplicateMessage(peerB, msg)
ps.rejectMessage(msg)
msg.receivedFrom = peerB
ps.duplicateMessage(msg)

aScore = ps.score(peerA)
bScore = ps.score(peerB)
Expand All @@ -504,11 +517,14 @@ describe('PeerScore', () => {
ps.deliveryRecords.gc()

// insert a new record in the message deliveries
ps.validateMessage(peerA, msg)
msg.receivedFrom = peerA
ps.validateMessage(msg)

// and reject the message after a duplicate has arrived
ps.duplicateMessage(peerB, msg)
ps.rejectMessage(peerA, msg)
msg.receivedFrom = peerB
ps.duplicateMessage(msg)
msg.receivedFrom = peerA
ps.rejectMessage(msg)

aScore = ps.score(peerA)
bScore = ps.score(peerB)
Expand Down
Loading