From 36fb61653919e6968ce42ed2e315353a014598a5 Mon Sep 17 00:00:00 2001 From: dirkmc Date: Thu, 12 Dec 2019 10:42:02 -0500 Subject: [PATCH] feat: include pending bytes in response message (#205) --- src/decision-engine/index.js | 12 ++++++++---- src/types/message/index.js | 10 ++++++++++ src/types/message/message.proto.js | 2 +- test/decision-engine/decision-engine.js | 26 +++++++++++++++++++++---- test/types/message.spec.js | 6 +++++- 5 files changed, 46 insertions(+), 10 deletions(-) diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index 99a4882f..e778846b 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -28,15 +28,17 @@ class DecisionEngine { async _sendBlocks (peer, blocks) { // split into messages of max 512 * 1024 bytes - const total = blocks.reduce((acc, b) => { + let total = blocks.reduce((acc, b) => { return acc + b.data.byteLength }, 0) + // If the blocks fit into one message, send the message right away if (total < MAX_MESSAGE_SIZE) { - await this._sendSafeBlocks(peer, blocks) + await this._sendSafeBlocks(peer, blocks, 0) return } + // The blocks don't all fit into one message so we need to split them up let size = 0 let batch = [] let outstanding = blocks.length @@ -45,6 +47,7 @@ class DecisionEngine { outstanding-- batch.push(b) size += b.data.byteLength + total -= b.data.byteLength if (size >= MAX_MESSAGE_SIZE || // need to ensure the last remaining items get sent @@ -53,7 +56,7 @@ class DecisionEngine { const nextBatch = batch.slice() batch = [] try { - await this._sendSafeBlocks(peer, nextBatch) + await this._sendSafeBlocks(peer, nextBatch, total) } catch (err) { // catch the error so as to send as many blocks as we can this._log('sendblock error: %s', err.message) @@ -62,9 +65,10 @@ class DecisionEngine { } } - async _sendSafeBlocks (peer, blocks) { + async _sendSafeBlocks (peer, blocks, pendingBytes) { const msg = new Message(false) blocks.forEach((b) => msg.addBlock(b)) + msg.setPendingBytes(pendingBytes) await this.network.sendMessage(peer, msg) } diff --git a/src/types/message/index.js b/src/types/message/index.js index 1dd9c68d..d87089a1 100644 --- a/src/types/message/index.js +++ b/src/types/message/index.js @@ -15,6 +15,7 @@ class BitswapMessage { this.full = full this.wantlist = new Map() this.blocks = new Map() + this.pendingBytes = 0 } get empty () { @@ -46,6 +47,10 @@ class BitswapMessage { this.addEntry(cid, 0, true) } + setPendingBytes (size) { + this.pendingBytes = size + } + /* * Serializes to Bitswap Message protobuf of * version 1.0.0 @@ -101,6 +106,10 @@ class BitswapMessage { }) }) + if (this.pendingBytes > 0) { + msg.pendingBytes = this.pendingBytes + } + return pbm.Message.encode(msg) } @@ -162,6 +171,7 @@ BitswapMessage.deserialize = async (raw) => { const cid = new CID(cidVersion, getName(multicodec), hash) msg.addBlock(new Block(p.data, cid)) })) + msg.setPendingBytes(decoded.pendingBytes) return msg } diff --git a/src/types/message/message.proto.js b/src/types/message/message.proto.js index f040f6b8..258f3437 100644 --- a/src/types/message/message.proto.js +++ b/src/types/message/message.proto.js @@ -41,6 +41,6 @@ module.exports = ` repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0 repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0 repeated BlockPresence blockPresences = 4; - int32 PendingBytes = 5; + int32 pendingBytes = 5; } ` diff --git a/test/decision-engine/decision-engine.js b/test/decision-engine/decision-engine.js index 2e036cd2..b9d9c63a 100644 --- a/test/decision-engine/decision-engine.js +++ b/test/decision-engine/decision-engine.js @@ -159,6 +159,17 @@ describe('Engine', () => { }) it('splits large block messages', () => { + const sum = (nums) => nums.reduce((a, b) => a + b, 0) + + const getMessageSizes = (messages) => { + const sizes = [] + for (const [, msg] of messages) { + const blocks = [...msg.blocks.values()] + sizes.push(sum(blocks.map(b => b.data.byteLength))) + } + return sizes + } + const data = range(10).map((i) => { const b = Buffer.alloc(1024 * 256) b.fill(i) @@ -167,11 +178,16 @@ describe('Engine', () => { return new Promise((resolve, reject) => { const net = mockNetwork(5, (res) => { - res.messages.forEach((message) => { + const messageSizes = getMessageSizes(res.messages) + for (let i = 0; i < res.messages.length; i++) { + const [, message] = res.messages[i] // The batch size is big enough to hold two blocks, so every // message should contain two blocks - expect(message[1].blocks.size).to.eql(2) - }) + expect(message.blocks.size).to.eql(2) + // The pending bytes should be the sum of the size of blocks in the + // remaining messages + expect(message.pendingBytes).to.eql(sum(messageSizes.slice(i + 1))) + } resolve() }) @@ -189,10 +205,12 @@ describe('Engine', () => { const blocks = res[1] const cids = blocks.map((b) => b.cid) + // Put blocks into the node's blockstore await Promise.all((blocks.map((b) => sf.blockstore.put(b)))) + + // Simulate receiving a wantlist for all the blocks const msg = new Message(false) cids.forEach((c, i) => msg.addEntry(c, Math.pow(2, 32) - 1 - i)) - sf.messageReceived(id, msg) }) .catch(reject) diff --git a/test/types/message.spec.js b/test/types/message.spec.js index 48746de8..d96e1949 100644 --- a/test/types/message.spec.js +++ b/test/types/message.spec.js @@ -53,11 +53,13 @@ describe('BitswapMessage', () => { const block = blocks[1] const msg = new BitswapMessage(true) msg.addBlock(block) + msg.setPendingBytes(10) const serialized = msg.serializeToBitswap110() const decoded = pbm.Message.decode(serialized) expect(decoded.payload[0].data).to.eql(block.data) + expect(decoded.pendingBytes).to.eql(10) }) it('.deserialize a Bitswap100 Message', async () => { @@ -120,7 +122,8 @@ describe('BitswapMessage', () => { }, { data: b2.data, prefix: cid2.prefix - }] + }], + pendingBytes: 10 }) const msg = await BitswapMessage.deserialize(raw) @@ -137,6 +140,7 @@ describe('BitswapMessage', () => { [cid1.toString('base58btc'), b1.data], [cid2.toString('base58btc'), b2.data] ]) + expect(msg.pendingBytes).to.equal(10) }) it('ignores duplicates', () => {