Skip to content

Commit c27e844

Browse files
authored
fix(NODE-3833): return early on end if gridfs upload stream is already ended (#3223)
1 parent b0085a2 commit c27e844

File tree

2 files changed

+76
-1
lines changed

2 files changed

+76
-1
lines changed

src/gridfs/upload.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ export class GridFSBucketWriteStream extends Writable implements NodeJS.Writable
196196
? encodingOrCallback
197197
: callback;
198198

199-
if (checkAborted(this, callback)) return this;
199+
if (this.state.streamEnd || checkAborted(this, callback)) return this;
200200

201201
this.state.streamEnd = true;
202202

test/integration/gridfs/gridfs_stream.test.js

+75
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ const fs = require('fs');
66
const { setupDatabase, withClient } = require('./../shared');
77
const { expect } = require('chai');
88
const { GridFSBucket, ObjectId } = require('../../../src');
9+
const sinon = require('sinon');
10+
const { sleep } = require('../../tools/utils');
911

1012
describe('GridFS Stream', function () {
1113
before(function () {
@@ -947,6 +949,79 @@ describe('GridFS Stream', function () {
947949
}
948950
});
949951

952+
describe('upload stream end()', () => {
953+
let client, db;
954+
955+
afterEach(async () => {
956+
sinon.restore();
957+
await client.close();
958+
});
959+
960+
it('should not call the callback on repeat calls to end', {
961+
metadata: { requires: { topology: ['single'] } },
962+
963+
async test() {
964+
const configuration = this.configuration;
965+
client = configuration.newClient(configuration.writeConcernMax(), {
966+
maxPoolSize: 1
967+
});
968+
await client.connect();
969+
db = client.db(configuration.db);
970+
const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 });
971+
const uploadStream = bucket.openUploadStream('test.dat');
972+
973+
const endPromise = new Promise(resolve => {
974+
uploadStream.end('1', resolve);
975+
});
976+
977+
const endPromise2 = new Promise((resolve, reject) => {
978+
uploadStream.end('2', () => {
979+
reject(new Error('Expected callback to not be called on duplicate end'));
980+
});
981+
});
982+
983+
await endPromise;
984+
// in the fail case, the callback would be called when the actual write is finished,
985+
// so we need to give it a moment
986+
await Promise.race([endPromise2, sleep(100)]);
987+
}
988+
});
989+
990+
it('should not write a chunk on repeat calls to end', {
991+
metadata: { requires: { topology: ['single'] } },
992+
993+
async test() {
994+
const configuration = this.configuration;
995+
client = configuration.newClient(configuration.writeConcernMax(), {
996+
maxPoolSize: 1
997+
});
998+
await client.connect();
999+
db = client.db(this.configuration.db);
1000+
const bucket = new GridFSBucket(db, { bucketName: 'gridfsabort', chunkSizeBytes: 1 });
1001+
const uploadStream = bucket.openUploadStream('test.dat');
1002+
const spy = sinon.spy(uploadStream, 'write');
1003+
1004+
const endPromise = new Promise(resolve => {
1005+
uploadStream.end('1', resolve);
1006+
});
1007+
1008+
await endPromise;
1009+
expect(spy).to.have.been.calledWith('1');
1010+
1011+
uploadStream.end('2');
1012+
1013+
// wait for potential async calls to happen before we close the client
1014+
// so that we don't get a client not connected failure in the afterEach
1015+
// in the failure case since it would be confusing and unnecessary
1016+
// given the assertions we already have for this case
1017+
await sleep(100);
1018+
1019+
expect(spy).not.to.have.been.calledWith('2');
1020+
expect(spy.calledOnce).to.be.true;
1021+
}
1022+
});
1023+
});
1024+
9501025
/**
9511026
* Provide start and end parameters for file download to skip ahead x bytes and limit the total amount of bytes read to n
9521027
*

0 commit comments

Comments
 (0)