diff --git a/lib/core/sdam/server.js b/lib/core/sdam/server.js index d37c008026f..5ce601331ac 100644 --- a/lib/core/sdam/server.js +++ b/lib/core/sdam/server.js @@ -1,16 +1,14 @@ 'use strict'; const EventEmitter = require('events'); +const ConnectionPool = require('../../cmap/connection_pool').ConnectionPool; const MongoError = require('../error').MongoError; -const Pool = require('../connection/pool'); const relayEvents = require('../utils').relayEvents; -const wireProtocol = require('../wireprotocol'); const BSON = require('../connection/utils').retrieveBSON(); const createClientInfo = require('../topologies/shared').createClientInfo; const Logger = require('../connection/logger'); const ServerDescription = require('./server_description').ServerDescription; const ReadPreference = require('../topologies/read_preference'); const Monitor = require('./monitor').Monitor; -const MongoParseError = require('../error').MongoParseError; const MongoNetworkError = require('../error').MongoNetworkError; const collationNotSupported = require('../utils').collationNotSupported; const debugOptions = require('../connection/utils').debugOptions; @@ -103,14 +101,27 @@ class Server extends EventEmitter { ]), // client metadata for the initial handshake clientInfo: createClientInfo(options), - // the connection pool - pool: null, // the server state state: STATE_CLOSED, credentials: options.credentials, topology }; + // create the connection pool + // NOTE: this used to happen in `connect`, we supported overriding pool options there + const addressParts = this.description.address.split(':'); + const poolOptions = Object.assign( + { host: addressParts[0], port: parseInt(addressParts[1], 10), bson: this.s.bson }, + options + ); + + this.s.pool = new ConnectionPool(poolOptions); + relayEvents(this.s.pool, this, ['commandStarted', 'commandSucceeded', 'commandFailed']); + this.s.pool.on('clusterTimeReceived', clusterTime => { + this.clusterTime = clusterTime; + }); + + // create the monitor this[kMonitor] = new Monitor(this, this.s.options); relayEvents(this[kMonitor], this, [ 'serverHeartbeatStarted', @@ -122,7 +133,7 @@ class Server extends EventEmitter { ]); this[kMonitor].on('resetConnectionPool', () => { - this.s.pool.reset(); + this.s.pool.clear(); }); this[kMonitor].on('serverHeartbeatFailed', event => { @@ -135,6 +146,11 @@ class Server extends EventEmitter { error: event.failure }) ); + + if (this.s.state === STATE_CONNECTING) { + this.emit('error', new MongoNetworkError(event.failure)); + this.destroy(); + } }); this[kMonitor].on('serverHeartbeatSucceeded', event => { @@ -144,6 +160,11 @@ class Server extends EventEmitter { roundTripTime: event.duration }) ); + + if (this.s.state === STATE_CONNECTING) { + stateTransition(this, STATE_CONNECTED); + this.emit('connect', this); + } }); } @@ -165,57 +186,21 @@ class Server extends EventEmitter { /** * Initiate server connect */ - connect(options) { - options = options || {}; - - // do not allow connect to be called on anything that's not disconnected - if (this.s.pool && !this.s.pool.isDisconnected() && !this.s.pool.isDestroyed()) { - throw new MongoError(`Server instance in invalid state ${this.s.pool.state}`); - } - - // create a pool - const addressParts = this.description.address.split(':'); - const poolOptions = Object.assign( - { host: addressParts[0], port: parseInt(addressParts[1], 10) }, - this.s.options, - options, - { bson: this.s.bson } - ); - - // NOTE: reconnect is explicitly false because of the server selection loop - poolOptions.reconnect = false; - poolOptions.legacyCompatMode = false; - - this.s.pool = new Pool(this, poolOptions); - - // setup listeners - this.s.pool.on('parseError', parseErrorEventHandler(this)); - - this.s.pool.on('drain', err => { - this.emit('error', err); - }); - - // it is unclear whether consumers should even know about these events - // this.s.pool.on('timeout', timeoutEventHandler(this)); - // this.s.pool.on('reconnect', reconnectEventHandler(this)); - // this.s.pool.on('reconnectFailed', errorEventHandler(this)); - - // relay all command monitoring events - relayEvents(this.s.pool, this, ['commandStarted', 'commandSucceeded', 'commandFailed']); - + connect() { stateTransition(this, STATE_CONNECTING); - - this.s.pool.connect(connectEventHandler(this)); + this[kMonitor].connect(); } /** * Destroy the server connection * + * @param {object} [options] Optional settings * @param {Boolean} [options.force=false] Force destroy the pool */ destroy(options, callback) { if (typeof options === 'function') (callback = options), (options = {}); options = Object.assign({}, { force: false }, options); + if (this.s.state === STATE_CLOSED) { if (typeof callback === 'function') { callback(); @@ -226,30 +211,14 @@ class Server extends EventEmitter { stateTransition(this, STATE_CLOSING); - const done = err => { + this[kMonitor].close(); + this.s.pool.close(options, err => { stateTransition(this, STATE_CLOSED); this.emit('closed'); if (typeof callback === 'function') { - callback(err, null); + callback(err); } - }; - - // close the monitor - this[kMonitor].close(); - - if (!this.s.pool) { - return done(); - } - - ['close', 'error', 'timeout', 'parseError', 'connect'].forEach(event => { - this.s.pool.removeAllListeners(event); }); - - if (this.s.monitorId) { - clearTimeout(this.s.monitorId); - } - - this.s.pool.destroy(options.force, done); } /** @@ -265,12 +234,13 @@ class Server extends EventEmitter { * * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1) * @param {object} cmd The command hash + * @param {object} [options] Optional settings * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.checkKeys=false] Specify if the bson parser should validate keys. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. * @param {Boolean} [options.fullResult=false] Return the full envelope instead of just the result document. - * @param {ClientSession} [options.session=null] Session to use for the operation + * @param {ClientSession} [options.session] Session to use for the operation * @param {opResultCallback} callback A callback function */ command(ns, cmd, options, callback) { @@ -285,7 +255,7 @@ class Server extends EventEmitter { const error = basicReadValidations(this, options); if (error) { - return callback(error, null); + return callback(error); } // Clone the options @@ -308,19 +278,23 @@ class Server extends EventEmitter { return; } - wireProtocol.command(this, ns, cmd, options, (err, result) => { - if (err) { - if (options.session && err instanceof MongoNetworkError) { - options.session.serverSession.isDirty = true; - } + this.s.pool.withConnection((err, conn, cb) => { + if (err) return cb(err); - if (isSDAMUnrecoverableError(err, this)) { - this.emit('error', err); + conn.command(ns, cmd, options, (err, result) => { + if (err) { + if (options.session && err instanceof MongoNetworkError) { + options.session.serverSession.isDirty = true; + } + + if (isSDAMUnrecoverableError(err, this)) { + this.emit('error', err); + } } - } - callback(err, result); - }); + cb(err, result); + }); + }, callback); } /** @@ -337,19 +311,23 @@ class Server extends EventEmitter { return; } - wireProtocol.query(this, ns, cmd, cursorState, options, (err, result) => { - if (err) { - if (options.session && err instanceof MongoNetworkError) { - options.session.serverSession.isDirty = true; - } + this.s.pool.withConnection((err, conn, cb) => { + if (err) return cb(err); - if (isSDAMUnrecoverableError(err, this)) { - this.emit('error', err); + conn.query(ns, cmd, cursorState, options, (err, result) => { + if (err) { + if (options.session && err instanceof MongoNetworkError) { + options.session.serverSession.isDirty = true; + } + + if (isSDAMUnrecoverableError(err, this)) { + this.emit('error', err); + } } - } - callback(err, result); - }); + cb(err, result); + }); + }, callback); } /** @@ -366,19 +344,23 @@ class Server extends EventEmitter { return; } - wireProtocol.getMore(this, ns, cursorState, batchSize, options, (err, result) => { - if (err) { - if (options.session && err instanceof MongoNetworkError) { - options.session.serverSession.isDirty = true; - } + this.s.pool.withConnection((err, conn, cb) => { + if (err) return cb(err); - if (isSDAMUnrecoverableError(err, this)) { - this.emit('error', err); + conn.getMore(ns, cursorState, batchSize, options, (err, result) => { + if (err) { + if (options.session && err instanceof MongoNetworkError) { + options.session.serverSession.isDirty = true; + } + + if (isSDAMUnrecoverableError(err, this)) { + this.emit('error', err); + } } - } - callback(err, result); - }); + cb(err, result); + }); + }, callback); } /** @@ -397,15 +379,17 @@ class Server extends EventEmitter { return; } - wireProtocol.killCursors(this, ns, cursorState, (err, result) => { - if (err && isSDAMUnrecoverableError(err, this)) { - this.emit('error', err); - } + this.s.pool.withConnection((err, conn, cb) => { + if (err) return cb(err); - if (typeof callback === 'function') { - callback(err, result); - } - }); + conn.killCursors(ns, cursorState, (err, result) => { + if (err && isSDAMUnrecoverableError(err, this)) { + this.emit('error', err); + } + + cb(err, result); + }); + }, callback); } /** @@ -417,7 +401,7 @@ class Server extends EventEmitter { * @param {object} [options.writeConcern={}] Write concern for the operation * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. - * @param {ClientSession} [options.session=null] Session to use for the operation + * @param {ClientSession} [options.session] Session to use for the operation * @param {opResultCallback} callback A callback function */ insert(ns, ops, options, callback) { @@ -433,7 +417,7 @@ class Server extends EventEmitter { * @param {object} [options.writeConcern={}] Write concern for the operation * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. - * @param {ClientSession} [options.session=null] Session to use for the operation + * @param {ClientSession} [options.session] Session to use for the operation * @param {opResultCallback} callback A callback function */ update(ns, ops, options, callback) { @@ -449,7 +433,7 @@ class Server extends EventEmitter { * @param {object} [options.writeConcern={}] Write concern for the operation * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized. * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields. - * @param {ClientSession} [options.session=null] Session to use for the operation + * @param {ClientSession} [options.session] Session to use for the operation * @param {opResultCallback} callback A callback function */ remove(ns, ops, options, callback) { @@ -466,24 +450,7 @@ Object.defineProperty(Server.prototype, 'clusterTime', { } }); -function basicWriteValidations(server) { - if (!server.s.pool) { - return new MongoError('server instance is not connected'); - } - - if (server.s.pool.isDestroyed()) { - return new MongoError('server instance pool was destroyed'); - } - - return null; -} - function basicReadValidations(server, options) { - const error = basicWriteValidations(server, options); - if (error) { - return error; - } - if (options.readPreference && !(options.readPreference instanceof ReadPreference)) { return new MongoError('readPreference must be an instance of ReadPreference'); } @@ -504,83 +471,28 @@ function executeWriteOperation(args, options, callback) { return; } - const error = basicWriteValidations(server, options); - if (error) { - callback(error, null); - return; - } - if (collationNotSupported(server, options)) { callback(new MongoError(`server ${server.name} does not support collation`)); return; } - return wireProtocol[op](server, ns, ops, options, (err, result) => { - if (err) { - if (options.session && err instanceof MongoNetworkError) { - options.session.serverSession.isDirty = true; - } + server.s.pool.withConnection((err, conn, cb) => { + if (err) return cb(err); - if (isSDAMUnrecoverableError(err, server)) { - server.emit('error', err); - } - } - - callback(err, result); - }); -} - -function connectEventHandler(server) { - return function(err, conn) { - if (server.s.state === STATE_CLOSING || server.s.state === STATE_CLOSED) { - return; - } - - if (err) { - server.emit('error', new MongoNetworkError(err)); - - stateTransition(server, STATE_CLOSED); - server.emit('close'); - return; - } - - const ismaster = conn.ismaster; - server.s.lastIsMasterMS = conn.lastIsMasterMS; - if (conn.agreedCompressor) { - server.s.pool.options.agreedCompressor = conn.agreedCompressor; - } - - if (conn.zlibCompressionLevel) { - server.s.pool.options.zlibCompressionLevel = conn.zlibCompressionLevel; - } - - if (conn.ismaster.$clusterTime) { - const $clusterTime = conn.ismaster.$clusterTime; - server.s.sclusterTime = $clusterTime; - } - - // log the connection event if requested - if (server.s.logger.isInfo()) { - server.s.logger.info( - `server ${server.name} connected with ismaster [${JSON.stringify(ismaster)}]` - ); - } - - // start up the server monitor - // TODO: move this to `connect` when new connection pool is installed - server[kMonitor].connect(); + conn[op](ns, ops, options, (err, result) => { + if (err) { + if (options.session && err instanceof MongoNetworkError) { + options.session.serverSession.isDirty = true; + } - // we are connected and handshaked (guaranteed by the pool) - stateTransition(server, STATE_CONNECTED); - server.emit('connect', server); - }; -} + if (isSDAMUnrecoverableError(err, server)) { + server.emit('error', err); + } + } -function parseErrorEventHandler(server) { - return function(err) { - stateTransition(this, STATE_CLOSED); - server.emit('error', new MongoParseError(err)); - }; + cb(err, result); + }); + }, callback); } module.exports = {