Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6275): Add CSOT support to GridFS #4246

Merged
merged 32 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
867d35e
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
nbbeeken Jul 26, 2024
065b214
feat(NODE-6313): add CSOT support to sessions and transactions (#4199)
nbbeeken Sep 9, 2024
4180272
feat(NODE-6304): add CSOT support for non-tailable cursors (#4195)
W-A-James Sep 12, 2024
d6b9e58
WIP
W-A-James Sep 23, 2024
4f73e77
unskip gridfs tests
W-A-James Sep 23, 2024
0b2ff73
first draft implementation
W-A-James Sep 24, 2024
b35faeb
lint
W-A-James Sep 24, 2024
9c30cab
start prose test implementatiuon
W-A-James Sep 24, 2024
b469017
WIP
W-A-James Sep 25, 2024
a69eba7
lint (WIP)
W-A-James Sep 25, 2024
72ccc6a
feat(NODE-5682): set maxTimeMS on commands and preempt I/O (#4174)
nbbeeken Jul 26, 2024
8c01d69
feat(NODE-6313): add CSOT support to sessions and transactions (#4199)
nbbeeken Sep 9, 2024
4aecf73
Working
W-A-James Sep 25, 2024
98e6963
add test
W-A-James Sep 25, 2024
15c84f5
add helper to clean up code
W-A-James Sep 25, 2024
38e4c20
lint
W-A-James Sep 26, 2024
234f357
use latest mongodb-legacy
W-A-James Sep 27, 2024
95f4179
skip tests on unsupported server versions
W-A-James Sep 27, 2024
5be2d94
review comments
W-A-James Sep 27, 2024
b66c454
remove unneeded listener
W-A-James Sep 27, 2024
171458f
update tests
W-A-James Sep 27, 2024
88a7aef
update UTR
W-A-James Sep 30, 2024
0707ac3
fix UTR
W-A-James Sep 30, 2024
1f81670
convert getRemainingTimeMSOrThrow to method on CSOTTimeoutContext
W-A-James Sep 30, 2024
44509b6
process errors correctly and call callback asynchronously
W-A-James Oct 1, 2024
b2c361e
review comment cleanup
W-A-James Oct 1, 2024
352b923
rebase fixup
W-A-James Oct 1, 2024
4f08744
remove comment
W-A-James Oct 1, 2024
d70d43e
Update src/gridfs/index.ts
W-A-James Oct 2, 2024
096e51f
ensure options are resulved in constructor and not always pulled from db
W-A-James Oct 2, 2024
1e1d0f4
Merge branch 'NODE-6090' into NODE-6275
W-A-James Oct 3, 2024
e044a5c
extend test timeouts
W-A-James Oct 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
"mocha": "^10.4.0",
"mocha-sinon": "^2.1.2",
"mongodb-client-encryption": "^6.1.0",
"mongodb-legacy": "^6.1.1",
"mongodb-legacy": "^6.1.2",
"nyc": "^15.1.0",
"prettier": "^3.3.3",
"semver": "^7.6.3",
Expand Down
10 changes: 8 additions & 2 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,18 @@ export class Collection<TSchema extends Document = Document> {
*/
async findOne(): Promise<WithId<TSchema> | null>;
async findOne(filter: Filter<TSchema>): Promise<WithId<TSchema> | null>;
async findOne(filter: Filter<TSchema>, options: FindOptions): Promise<WithId<TSchema> | null>;
async findOne(
filter: Filter<TSchema>,
options: Omit<FindOptions, 'timeoutMode'>
): Promise<WithId<TSchema> | null>;

// allow an override of the schema.
async findOne<T = TSchema>(): Promise<T | null>;
async findOne<T = TSchema>(filter: Filter<TSchema>): Promise<T | null>;
async findOne<T = TSchema>(filter: Filter<TSchema>, options?: FindOptions): Promise<T | null>;
async findOne<T = TSchema>(
filter: Filter<TSchema>,
options?: Omit<FindOptions, 'timeoutMode'>
): Promise<T | null>;

async findOne(
filter: Filter<TSchema> = {},
Expand Down
44 changes: 40 additions & 4 deletions src/gridfs/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Readable } from 'stream';

import type { Document, ObjectId } from '../bson';
import type { Collection } from '../collection';
import { CursorTimeoutMode } from '../cursor/abstract_cursor';
import type { FindCursor } from '../cursor/find_cursor';
import {
MongoGridFSChunkError,
Expand All @@ -12,6 +13,7 @@ import {
import type { FindOptions } from '../operations/find';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { CSOTTimeoutContext } from '../timeout';
import type { Callback } from '../utils';
import type { GridFSChunk } from './upload';

Expand All @@ -28,7 +30,7 @@ export interface GridFSBucketReadStreamOptions {
* to be returned by the stream. `end` is non-inclusive
*/
end?: number;
/** @internal TODO(NODE-5688): make this public */
/** @public */
timeoutMS?: number;
}

Expand Down Expand Up @@ -98,8 +100,10 @@ export interface GridFSBucketReadStreamPrivate {
skip?: number;
start: number;
end: number;
timeoutMS?: number;
};
readPreference?: ReadPreference;
timeoutContext?: CSOTTimeoutContext;
}

/**
Expand Down Expand Up @@ -148,7 +152,11 @@ export class GridFSBucketReadStream extends Readable {
end: 0,
...options
},
readPreference
readPreference,
timeoutContext:
options?.timeoutMS != null
? new CSOTTimeoutContext({ timeoutMS: options.timeoutMS, serverSelectionTimeoutMS: 0 })
: undefined
};
}

Expand Down Expand Up @@ -196,7 +204,8 @@ export class GridFSBucketReadStream extends Readable {
async abort(): Promise<void> {
this.push(null);
this.destroy();
await this.s.cursor?.close();
const remainingTimeMS = this.s.timeoutContext?.getRemainingTimeMSOrThrow();
await this.s.cursor?.close({ timeoutMS: remainingTimeMS });
}
}

Expand Down Expand Up @@ -352,7 +361,22 @@ function init(stream: GridFSBucketReadStream): void {
filter['n'] = { $gte: skip };
}
}
stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });

let remainingTimeMS: number | undefined;
try {
remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow(
`Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms`
);
} catch (error) {
return stream.destroy(error);
}

stream.s.cursor = stream.s.chunks
.find(filter, {
timeoutMode: stream.s.options.timeoutMS != null ? CursorTimeoutMode.LIFETIME : undefined,
timeoutMS: remainingTimeMS
})
.sort({ n: 1 });

if (stream.s.readPreference) {
stream.s.cursor.withReadPreference(stream.s.readPreference);
Expand All @@ -371,6 +395,18 @@ function init(stream: GridFSBucketReadStream): void {
return;
};

let remainingTimeMS: number | undefined;
try {
remainingTimeMS = stream.s.timeoutContext?.getRemainingTimeMSOrThrow(
`Download timed out after ${stream.s.timeoutContext?.timeoutMS}ms`
);
} catch (error) {
if (!stream.destroyed) stream.destroy(error);
return;
}

findOneOptions.timeoutMS = remainingTimeMS;

stream.s.files.findOne(stream.s.filter, findOneOptions).then(handleReadResult, error => {
if (stream.destroyed) return;
stream.destroy(error);
Expand Down
71 changes: 58 additions & 13 deletions src/gridfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import type { ObjectId } from '../bson';
import type { Collection } from '../collection';
import type { FindCursor } from '../cursor/find_cursor';
import type { Db } from '../db';
import { MongoRuntimeError } from '../error';
import { MongoOperationTimeoutError, MongoRuntimeError } from '../error';
import { type Filter, TypedEventEmitter } from '../mongo_types';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { CSOTTimeoutContext } from '../timeout';
import { resolveOptions } from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { FindOptions } from './../operations/find';
import {
Expand Down Expand Up @@ -48,6 +50,7 @@ export interface GridFSBucketPrivate {
chunkSizeBytes: number;
readPreference?: ReadPreference;
writeConcern: WriteConcern | undefined;
timeoutMS?: number;
};
_chunksCollection: Collection<GridFSChunk>;
_filesCollection: Collection<GridFSFile>;
Expand Down Expand Up @@ -82,6 +85,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
super();
this.setMaxListeners(0);
const privateOptions = {
timeoutMS: db.timeoutMS,
...DEFAULT_GRIDFS_BUCKET_OPTIONS,
...options,
writeConcern: WriteConcern.fromOptions(options)
Expand Down Expand Up @@ -109,7 +113,10 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
filename: string,
options?: GridFSBucketWriteStreamOptions
): GridFSBucketWriteStream {
return new GridFSBucketWriteStream(this, filename, options);
return new GridFSBucketWriteStream(this, filename, {
timeoutMS: this.s.options.timeoutMS,
...options
});
}

/**
Expand All @@ -122,7 +129,11 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
filename: string,
options?: GridFSBucketWriteStreamOptions
): GridFSBucketWriteStream {
return new GridFSBucketWriteStream(this, filename, { ...options, id });
return new GridFSBucketWriteStream(this, filename, {
timeoutMS: this.s.db.timeoutMS,
...options,
id
});
}

/** Returns a readable stream (GridFSBucketReadStream) for streaming file data from GridFS. */
Expand All @@ -135,7 +146,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
this.s._filesCollection,
this.s.options.readPreference,
{ _id: id },
options
{ timeoutMS: this.s.db.timeoutMS, ...options }
);
}

Expand All @@ -144,11 +155,27 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
*
* @param id - The id of the file doc
*/
async delete(id: ObjectId): Promise<void> {
const { deletedCount } = await this.s._filesCollection.deleteOne({ _id: id });
async delete(id: ObjectId, options?: { timeoutMS: number }): Promise<void> {
const { timeoutMS } = resolveOptions(this.s.db, options);
let timeoutContext: CSOTTimeoutContext | undefined = undefined;

if (timeoutMS) {
timeoutContext = new CSOTTimeoutContext({
timeoutMS,
serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS
});
}

const { deletedCount } = await this.s._filesCollection.deleteOne(
{ _id: id },
{ timeoutMS: timeoutContext?.remainingTimeMS }
);

const remainingTimeMS = timeoutContext?.remainingTimeMS;
if (remainingTimeMS != null && remainingTimeMS <= 0)
throw new MongoOperationTimeoutError(`Timed out after ${timeoutMS}ms`);
// Delete orphaned chunks before returning FileNotFound
await this.s._chunksCollection.deleteMany({ files_id: id });
await this.s._chunksCollection.deleteMany({ files_id: id }, { timeoutMS: remainingTimeMS });

if (deletedCount === 0) {
// TODO(NODE-3483): Replace with more appropriate error
Expand Down Expand Up @@ -188,7 +215,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
this.s._filesCollection,
this.s.options.readPreference,
{ filename },
{ ...options, sort, skip }
{ timeoutMS: this.s.db.timeoutMS, ...options, sort, skip }
);
}

Expand All @@ -198,18 +225,36 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {
* @param id - the id of the file to rename
* @param filename - new name for the file
*/
async rename(id: ObjectId, filename: string): Promise<void> {
async rename(id: ObjectId, filename: string, options?: { timeoutMS: number }): Promise<void> {
const filter = { _id: id };
const update = { $set: { filename } };
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update);
const { matchedCount } = await this.s._filesCollection.updateOne(filter, update, options);
if (matchedCount === 0) {
throw new MongoRuntimeError(`File with id ${id} not found`);
}
}

/** Removes this bucket's files collection, followed by its chunks collection. */
async drop(): Promise<void> {
await this.s._filesCollection.drop();
await this.s._chunksCollection.drop();
async drop(options?: { timeoutMS: number }): Promise<void> {
const { timeoutMS } = resolveOptions(this.s.db, options);
let timeoutContext: CSOTTimeoutContext | undefined = undefined;

if (timeoutMS) {
timeoutContext = new CSOTTimeoutContext({
timeoutMS,
serverSelectionTimeoutMS: this.s.db.client.options.serverSelectionTimeoutMS
});
}

if (timeoutContext) {
await this.s._filesCollection.drop({ timeoutMS: timeoutContext.remainingTimeMS });
const remainingTimeMS = timeoutContext.getRemainingTimeMSOrThrow(
`Timed out after ${timeoutMS}ms`
);
await this.s._chunksCollection.drop({ timeoutMS: remainingTimeMS });
} else {
await this.s._filesCollection.drop();
await this.s._chunksCollection.drop();
}
}
}
Loading