diff --git a/package.json b/package.json index 4a0a39aa..67fe2d33 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "peer-info": "^0.17.0", "promisify-es6": "^1.0.3", "rimraf": "^3.0.0", + "sinon": "^9.0.0", "stats-lite": "^2.2.0", "uuid": "^3.3.2" }, diff --git a/src/index.js b/src/index.js index 380a7b72..3f8d508f 100644 --- a/src/index.js +++ b/src/index.js @@ -104,6 +104,10 @@ class Bitswap { this._updateReceiveCounters(peerId.toB58String(), block, has) if (has || !wasWanted) { + if (wasWanted) { + this._sendHaveBlockNotifications(block) + } + return } @@ -282,32 +286,31 @@ class Bitswap { async putMany (blocks) { // eslint-disable-line require-await const self = this - // Add any new blocks to the blockstore - const newBlocks = [] - await this.blockstore.putMany(async function * () { - for await (const block of blocks) { - if (await self.blockstore.has(block.cid)) { - continue - } - - yield block - newBlocks.push(block) + for await (const block of blocks) { + if (await self.blockstore.has(block.cid)) { + continue } - }()) - - // Notify engine that we have new blocks - this.engine.receivedBlocks(newBlocks) - - // Notify listeners that we have received the new blocks - for (const block of newBlocks) { - this.notifications.hasBlock(block) - // Note: Don't wait for provide to finish before returning - this.network.provide(block.cid).catch((err) => { - self._log.error('Failed to provide: %s', err.message) - }) + + await this.blockstore.put(block) + + self._sendHaveBlockNotifications(block) } } + /** + * Sends notifications about the arrival of a block + * + * @param {Block} block + */ + _sendHaveBlockNotifications (block) { + this.notifications.hasBlock(block) + this.engine.receivedBlocks([block]) + // Note: Don't wait for provide to finish before returning + this.network.provide(block.cid).catch((err) => { + this._log.error('Failed to provide: %s', err.message) + }) + } + /** * Get the current list of wants. * diff --git a/test/bitswap.js b/test/bitswap.js index de4150db..3b8ea128 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -5,6 +5,8 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const delay = require('delay') +const PeerId = require('peer-id') +const sinon = require('sinon') const Bitswap = require('../src') @@ -12,6 +14,7 @@ const createTempRepo = require('./utils/create-temp-repo-nodejs') const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish +const Message = require('../src/types/message') // Creates a repo + libp2pNode + Bitswap with or without DHT async function createThing (dht) { @@ -70,6 +73,52 @@ describe('bitswap without DHT', function () { finish.assert() }) + + it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => { + // the block we want + const block = await makeBlock() + + // id of a peer with the block we want + const peerId = await PeerId.create({ bits: 512 }) + + // incoming message with requested block from the other peer + const message = new Message(false) + message.addEntry(block.cid, 1, false) + message.addBlock(block) + + // slow blockstore + nodes[0].bitswap.blockstore = { + has: sinon.stub().withArgs(block.cid).returns(false), + put: sinon.stub() + } + + // add the block to our want list + const wantBlockPromise1 = nodes[0].bitswap.get(block.cid) + + // oh look, a peer has sent it to us - this will trigger a `blockstore.put` which + // is an async operation so `self.blockstore.has(cid)` will still return false + // until the write has completed + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store did not have it + expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + + // another context wants the same block + const wantBlockPromise2 = nodes[0].bitswap.get(block.cid) + + // meanwhile the blockstore finishes it's batch + nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true) + + // here it comes again + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store had it this time + expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + + // both requests should get the block + expect(await wantBlockPromise1).to.deep.equal(block) + expect(await wantBlockPromise2).to.deep.equal(block) + }) }) describe('bitswap with DHT', function () {