From 57185eacfbd76f10b9a129cab359d110374c6cd6 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 4 Mar 2020 19:01:39 +0000 Subject: [PATCH] fix: race condition when requesting the same block twice When we call `blockstore.putMany`, some implementations will batch up all the `put`s and write them at once. This means that `blockstore.has` might not return `true` for a little while - if another request for a given block comes in before `blockstore.has` returns `true` it'll get added to the want list. If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang idefinately. The change made here is to separate the sending of notifications out from putting things into the blockstore. If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block. --- package.json | 1 + src/index.js | 47 +++++++++++++++++++++++++---------------------- test/bitswap.js | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 75 insertions(+), 22 deletions(-) diff --git a/package.json b/package.json index 4a0a39aa..67fe2d33 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "peer-info": "^0.17.0", "promisify-es6": "^1.0.3", "rimraf": "^3.0.0", + "sinon": "^9.0.0", "stats-lite": "^2.2.0", "uuid": "^3.3.2" }, diff --git a/src/index.js b/src/index.js index 380a7b72..3f8d508f 100644 --- a/src/index.js +++ b/src/index.js @@ -104,6 +104,10 @@ class Bitswap { this._updateReceiveCounters(peerId.toB58String(), block, has) if (has || !wasWanted) { + if (wasWanted) { + this._sendHaveBlockNotifications(block) + } + return } @@ -282,32 +286,31 @@ class Bitswap { async putMany (blocks) { // eslint-disable-line require-await const self = this - // Add any new blocks to the blockstore - const newBlocks = [] - await this.blockstore.putMany(async function * () { - for await (const block of blocks) { - if (await self.blockstore.has(block.cid)) { - continue - } - - yield block - newBlocks.push(block) + for await (const block of blocks) { + if (await self.blockstore.has(block.cid)) { + continue } - }()) - - // Notify engine that we have new blocks - this.engine.receivedBlocks(newBlocks) - - // Notify listeners that we have received the new blocks - for (const block of newBlocks) { - this.notifications.hasBlock(block) - // Note: Don't wait for provide to finish before returning - this.network.provide(block.cid).catch((err) => { - self._log.error('Failed to provide: %s', err.message) - }) + + await this.blockstore.put(block) + + self._sendHaveBlockNotifications(block) } } + /** + * Sends notifications about the arrival of a block + * + * @param {Block} block + */ + _sendHaveBlockNotifications (block) { + this.notifications.hasBlock(block) + this.engine.receivedBlocks([block]) + // Note: Don't wait for provide to finish before returning + this.network.provide(block.cid).catch((err) => { + this._log.error('Failed to provide: %s', err.message) + }) + } + /** * Get the current list of wants. * diff --git a/test/bitswap.js b/test/bitswap.js index de4150db..3b8ea128 100644 --- a/test/bitswap.js +++ b/test/bitswap.js @@ -5,6 +5,8 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const delay = require('delay') +const PeerId = require('peer-id') +const sinon = require('sinon') const Bitswap = require('../src') @@ -12,6 +14,7 @@ const createTempRepo = require('./utils/create-temp-repo-nodejs') const createLibp2pNode = require('./utils/create-libp2p-node') const makeBlock = require('./utils/make-block') const orderedFinish = require('./utils/helpers').orderedFinish +const Message = require('../src/types/message') // Creates a repo + libp2pNode + Bitswap with or without DHT async function createThing (dht) { @@ -70,6 +73,52 @@ describe('bitswap without DHT', function () { finish.assert() }) + + it('wants a block, receives a block, wants it again before the blockstore has it, receives it after the blockstore has it', async () => { + // the block we want + const block = await makeBlock() + + // id of a peer with the block we want + const peerId = await PeerId.create({ bits: 512 }) + + // incoming message with requested block from the other peer + const message = new Message(false) + message.addEntry(block.cid, 1, false) + message.addBlock(block) + + // slow blockstore + nodes[0].bitswap.blockstore = { + has: sinon.stub().withArgs(block.cid).returns(false), + put: sinon.stub() + } + + // add the block to our want list + const wantBlockPromise1 = nodes[0].bitswap.get(block.cid) + + // oh look, a peer has sent it to us - this will trigger a `blockstore.put` which + // is an async operation so `self.blockstore.has(cid)` will still return false + // until the write has completed + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store did not have it + expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + + // another context wants the same block + const wantBlockPromise2 = nodes[0].bitswap.get(block.cid) + + // meanwhile the blockstore finishes it's batch + nodes[0].bitswap.blockstore.has = sinon.stub().withArgs(block.cid).returns(true) + + // here it comes again + await nodes[0].bitswap._receiveMessage(peerId, message) + + // block store had it this time + expect(nodes[0].bitswap.blockstore.has.calledWith(block.cid)).to.be.true() + + // both requests should get the block + expect(await wantBlockPromise1).to.deep.equal(block) + expect(await wantBlockPromise2).to.deep.equal(block) + }) }) describe('bitswap with DHT', function () {