Skip to content

Commit

Permalink
feat: include pending bytes in response message (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 10, 2020
1 parent 11f5ff7 commit 36fb616
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 10 deletions.
12 changes: 8 additions & 4 deletions src/decision-engine/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ class DecisionEngine {

async _sendBlocks (peer, blocks) {
// split into messages of max 512 * 1024 bytes
const total = blocks.reduce((acc, b) => {
let total = blocks.reduce((acc, b) => {
return acc + b.data.byteLength
}, 0)

// If the blocks fit into one message, send the message right away
if (total < MAX_MESSAGE_SIZE) {
await this._sendSafeBlocks(peer, blocks)
await this._sendSafeBlocks(peer, blocks, 0)
return
}

// The blocks don't all fit into one message so we need to split them up
let size = 0
let batch = []
let outstanding = blocks.length
Expand All @@ -45,6 +47,7 @@ class DecisionEngine {
outstanding--
batch.push(b)
size += b.data.byteLength
total -= b.data.byteLength

if (size >= MAX_MESSAGE_SIZE ||
// need to ensure the last remaining items get sent
Expand All @@ -53,7 +56,7 @@ class DecisionEngine {
const nextBatch = batch.slice()
batch = []
try {
await this._sendSafeBlocks(peer, nextBatch)
await this._sendSafeBlocks(peer, nextBatch, total)
} catch (err) {
// catch the error so as to send as many blocks as we can
this._log('sendblock error: %s', err.message)
Expand All @@ -62,9 +65,10 @@ class DecisionEngine {
}
}

async _sendSafeBlocks (peer, blocks) {
async _sendSafeBlocks (peer, blocks, pendingBytes) {
const msg = new Message(false)
blocks.forEach((b) => msg.addBlock(b))
msg.setPendingBytes(pendingBytes)

await this.network.sendMessage(peer, msg)
}
Expand Down
10 changes: 10 additions & 0 deletions src/types/message/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class BitswapMessage {
this.full = full
this.wantlist = new Map()
this.blocks = new Map()
this.pendingBytes = 0
}

get empty () {
Expand Down Expand Up @@ -46,6 +47,10 @@ class BitswapMessage {
this.addEntry(cid, 0, true)
}

setPendingBytes (size) {
this.pendingBytes = size
}

/*
* Serializes to Bitswap Message protobuf of
* version 1.0.0
Expand Down Expand Up @@ -101,6 +106,10 @@ class BitswapMessage {
})
})

if (this.pendingBytes > 0) {
msg.pendingBytes = this.pendingBytes
}

return pbm.Message.encode(msg)
}

Expand Down Expand Up @@ -162,6 +171,7 @@ BitswapMessage.deserialize = async (raw) => {
const cid = new CID(cidVersion, getName(multicodec), hash)
msg.addBlock(new Block(p.data, cid))
}))
msg.setPendingBytes(decoded.pendingBytes)
return msg
}

Expand Down
2 changes: 1 addition & 1 deletion src/types/message/message.proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ module.exports = `
repeated bytes blocks = 2; // used to send Blocks in bitswap 1.0.0
repeated Block payload = 3; // used to send Blocks in bitswap 1.1.0
repeated BlockPresence blockPresences = 4;
int32 PendingBytes = 5;
int32 pendingBytes = 5;
}
`
26 changes: 22 additions & 4 deletions test/decision-engine/decision-engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ describe('Engine', () => {
})

it('splits large block messages', () => {
const sum = (nums) => nums.reduce((a, b) => a + b, 0)

const getMessageSizes = (messages) => {
const sizes = []
for (const [, msg] of messages) {
const blocks = [...msg.blocks.values()]
sizes.push(sum(blocks.map(b => b.data.byteLength)))
}
return sizes
}

const data = range(10).map((i) => {
const b = Buffer.alloc(1024 * 256)
b.fill(i)
Expand All @@ -167,11 +178,16 @@ describe('Engine', () => {

return new Promise((resolve, reject) => {
const net = mockNetwork(5, (res) => {
res.messages.forEach((message) => {
const messageSizes = getMessageSizes(res.messages)
for (let i = 0; i < res.messages.length; i++) {
const [, message] = res.messages[i]
// The batch size is big enough to hold two blocks, so every
// message should contain two blocks
expect(message[1].blocks.size).to.eql(2)
})
expect(message.blocks.size).to.eql(2)
// The pending bytes should be the sum of the size of blocks in the
// remaining messages
expect(message.pendingBytes).to.eql(sum(messageSizes.slice(i + 1)))
}
resolve()
})

Expand All @@ -189,10 +205,12 @@ describe('Engine', () => {
const blocks = res[1]
const cids = blocks.map((b) => b.cid)

// Put blocks into the node's blockstore
await Promise.all((blocks.map((b) => sf.blockstore.put(b))))

// Simulate receiving a wantlist for all the blocks
const msg = new Message(false)
cids.forEach((c, i) => msg.addEntry(c, Math.pow(2, 32) - 1 - i))

sf.messageReceived(id, msg)
})
.catch(reject)
Expand Down
6 changes: 5 additions & 1 deletion test/types/message.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ describe('BitswapMessage', () => {
const block = blocks[1]
const msg = new BitswapMessage(true)
msg.addBlock(block)
msg.setPendingBytes(10)

const serialized = msg.serializeToBitswap110()
const decoded = pbm.Message.decode(serialized)

expect(decoded.payload[0].data).to.eql(block.data)
expect(decoded.pendingBytes).to.eql(10)
})

it('.deserialize a Bitswap100 Message', async () => {
Expand Down Expand Up @@ -120,7 +122,8 @@ describe('BitswapMessage', () => {
}, {
data: b2.data,
prefix: cid2.prefix
}]
}],
pendingBytes: 10
})

const msg = await BitswapMessage.deserialize(raw)
Expand All @@ -137,6 +140,7 @@ describe('BitswapMessage', () => {
[cid1.toString('base58btc'), b1.data],
[cid2.toString('base58btc'), b2.data]
])
expect(msg.pendingBytes).to.equal(10)
})

it('ignores duplicates', () => {
Expand Down

0 comments on commit 36fb616

Please sign in to comment.