diff --git a/src/internal/bolt-protocol-v3.js b/src/internal/bolt-protocol-v3.js index 1e5b27f18..3a28346e2 100644 --- a/src/internal/bolt-protocol-v3.js +++ b/src/internal/bolt-protocol-v3.js @@ -22,9 +22,14 @@ import { assertDatabaseIsEmpty } from './bolt-protocol-util' import { StreamObserver, LoginObserver, - ResultStreamObserver + ResultStreamObserver, + ProcedureRouteObserver } from './stream-observers' import { BOLT_PROTOCOL_V3 } from './constants' +import Bookmark from './bookmark' +import TxConfig from './tx-config' +const CONTEXT = 'context' +const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})` const noOpObserver = new StreamObserver() @@ -183,4 +188,39 @@ export default class BoltProtocol extends BoltProtocolV2 { return observer } + + /** + * Request routing information + * + * @param {Object} param - + * @param {object} param.routingContext The routing context used to define the routing table. + * Multi-datacenter deployments is one of its use cases + * @param {string} param.databaseName The database name + * @param {Bookmark} params.sessionContext.bookmark The bookmark used for request the routing table + * @param {string} params.sessionContext.mode The session mode + * @param {string} params.sessionContext.database The database name used on the session + * @param {function()} params.sessionContext.afterComplete The session param used after the session closed + * @param {function(err: Error)} param.onError + * @param {function(RawRoutingTable)} param.onCompleted + * @returns {RouteObserver} the route observer + */ + requestRoutingInformation ({ + routingContext = {}, + sessionContext = {}, + onError, + onCompleted + }) { + const resultObserver = this.run( + CALL_GET_ROUTING_TABLE, + { [CONTEXT]: routingContext }, + { ...sessionContext, txConfig: TxConfig.empty() } + ) + + return new ProcedureRouteObserver({ + resultObserver, + connection: this._connection, + onError, + onCompleted + }) + } } diff --git a/src/internal/bolt-protocol-v4x0.js b/src/internal/bolt-protocol-v4x0.js index 0ed6e36c7..31ebddd22 100644 --- a/src/internal/bolt-protocol-v4x0.js +++ b/src/internal/bolt-protocol-v4x0.js @@ -18,8 +18,17 @@ */ import BoltProtocolV3 from './bolt-protocol-v3' import RequestMessage, { ALL } from './request-message' -import { ResultStreamObserver } from './stream-observers' +import { + ResultStreamObserver, + ProcedureRouteObserver +} from './stream-observers' import { BOLT_PROTOCOL_V4_0 } from './constants' +import Bookmark from './bookmark' +import TxConfig from './tx-config' + +const CONTEXT = 'context' +const DATABASE = 'database' +const CALL_GET_ROUTING_TABLE_MULTI_DB = `CALL dbms.routing.getRoutingTable($${CONTEXT}, $${DATABASE})` export default class BoltProtocol extends BoltProtocolV3 { get version () { @@ -119,4 +128,44 @@ export default class BoltProtocol extends BoltProtocolV3 { } _noOp () {} + + /** + * Request routing information + * + * @param {Object} param - + * @param {object} param.routingContext The routing context used to define the routing table. + * Multi-datacenter deployments is one of its use cases + * @param {string} param.databaseName The database name + * @param {Bookmark} params.sessionContext.bookmark The bookmark used for request the routing table + * @param {string} params.sessionContext.mode The session mode + * @param {string} params.sessionContext.database The database name used on the session + * @param {function()} params.sessionContext.afterComplete The session param used after the session closed + * @param {function(err: Error)} param.onError + * @param {function(RawRoutingTable)} param.onCompleted + * @returns {RouteObserver} the route observer + */ + requestRoutingInformation ({ + routingContext = {}, + databaseName = null, + sessionContext = {}, + initialAddress = null, + onError, + onCompleted + }) { + const resultObserver = this.run( + CALL_GET_ROUTING_TABLE_MULTI_DB, + { + [CONTEXT]: { ...routingContext, address: initialAddress }, + [DATABASE]: databaseName + }, + { ...sessionContext, txConfig: TxConfig.empty() } + ) + + return new ProcedureRouteObserver({ + resultObserver, + connection: this._connection, + onError, + onCompleted + }) + } } diff --git a/src/internal/bolt-protocol-v4x2.js b/src/internal/bolt-protocol-v4x2.js index fb11cf4fd..e24f3360e 100644 --- a/src/internal/bolt-protocol-v4x2.js +++ b/src/internal/bolt-protocol-v4x2.js @@ -20,17 +20,6 @@ import BoltProtocolV41 from './bolt-protocol-v4x1' import { BOLT_PROTOCOL_V4_2 } from './constants' export default class BoltProtocol extends BoltProtocolV41 { - /** - * @constructor - * @param {Connection} connection the connection. - * @param {Chunker} chunker the chunker. - * @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers. - * @param {Object} serversideRouting - */ - constructor (connection, chunker, disableLosslessIntegers, serversideRouting) { - super(connection, chunker, disableLosslessIntegers, serversideRouting) - } - get version () { return BOLT_PROTOCOL_V4_2 } diff --git a/src/internal/bolt-protocol-v4x3.js b/src/internal/bolt-protocol-v4x3.js new file mode 100644 index 000000000..18538638a --- /dev/null +++ b/src/internal/bolt-protocol-v4x3.js @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import BoltProtocolV42 from './bolt-protocol-v4x2' +import { BOLT_PROTOCOL_V4_3 } from './constants' +import RequestMessage from './request-message' +import { RouteObserver } from './stream-observers' + +export default class BoltProtocol extends BoltProtocolV42 { + get version () { + return BOLT_PROTOCOL_V4_3 + } + + /** + * Request routing information + * + * @param {Object} param - + * @param {object} param.routingContext The routing context used to define the routing table. + * Multi-datacenter deployments is one of its use cases + * @param {string} param.databaseName The database name + * @param {function(err: Error)} param.onError + * @param {function(RawRoutingTable)} param.onCompleted + * @returns {RouteObserver} the route observer + */ + + requestRoutingInformation ({ + routingContext = {}, + databaseName = null, + initialAddress = null, + onError, + onCompleted + }) { + const observer = new RouteObserver({ + connection: this._connection, + onError, + onCompleted + }) + + this._connection.write( + RequestMessage.route( + { ...routingContext, address: initialAddress }, + databaseName + ), + observer, + true + ) + + return observer + } +} diff --git a/src/internal/connection-provider-routing.js b/src/internal/connection-provider-routing.js index 6cafc2229..19b96f439 100644 --- a/src/internal/connection-provider-routing.js +++ b/src/internal/connection-provider-routing.js @@ -22,7 +22,6 @@ import { READ, WRITE } from '../driver' import Session from '../session' import RoutingTable from './routing-table' import Rediscovery from './rediscovery' -import { RoutingTableGetterFactory } from './routing-table-getter' import { HostNameResolver } from './node' import SingleConnectionProvider from './connection-provider-single' import PooledConnectionProvider from './connection-provider-pooled' @@ -65,9 +64,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider this._seedRouter = address this._routingTables = {} - this._rediscovery = new Rediscovery( - new RoutingTableGetterFactory(routingContext, address.toString()) - ) + this._rediscovery = new Rediscovery(routingContext, address.toString()) this._loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy( this._connectionPool ) diff --git a/src/internal/constants.js b/src/internal/constants.js index 8962be74c..0d3e6e3db 100644 --- a/src/internal/constants.js +++ b/src/internal/constants.js @@ -26,6 +26,7 @@ const BOLT_PROTOCOL_V3 = 3 const BOLT_PROTOCOL_V4_0 = 4.0 const BOLT_PROTOCOL_V4_1 = 4.1 const BOLT_PROTOCOL_V4_2 = 4.2 +const BOLT_PROTOCOL_V4_3 = 4.3 export { ACCESS_MODE_READ, @@ -35,5 +36,6 @@ export { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V4_1, - BOLT_PROTOCOL_V4_2 + BOLT_PROTOCOL_V4_2, + BOLT_PROTOCOL_V4_3 } diff --git a/src/internal/protocol-handshaker.js b/src/internal/protocol-handshaker.js index 77069f903..56a025186 100644 --- a/src/internal/protocol-handshaker.js +++ b/src/internal/protocol-handshaker.js @@ -25,7 +25,7 @@ import BoltProtocolV3 from './bolt-protocol-v3' import BoltProtocolV4x0 from './bolt-protocol-v4x0' import BoltProtocolV4x1 from './bolt-protocol-v4x1' import BoltProtocolV4x2 from './bolt-protocol-v4x2' - +import BoltProtocolV4x3 from './bolt-protocol-v4x3' const BOLT_MAGIC_PREAMBLE = 0x6060b017 export default class ProtocolHandshaker { @@ -132,6 +132,13 @@ export default class ProtocolHandshaker { this._disableLosslessIntegers, this._serversideRouting ) + case 4.3: + return new BoltProtocolV4x3( + this._connection, + this._chunker, + this._disableLosslessIntegers, + this._serversideRouting + ) default: throw newError('Unknown Bolt protocol version: ' + version) } @@ -149,7 +156,7 @@ function newHandshakeBuffer () { handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE) // proposed versions - handshakeBuffer.writeInt32((2 << 8) | 4) + handshakeBuffer.writeInt32((3 << 8) | 4) handshakeBuffer.writeInt32((1 << 8) | 4) handshakeBuffer.writeInt32(4) handshakeBuffer.writeInt32(3) diff --git a/src/internal/rediscovery.js b/src/internal/rediscovery.js index e7bab622a..704633069 100644 --- a/src/internal/rediscovery.js +++ b/src/internal/rediscovery.js @@ -17,17 +17,23 @@ * limitations under the License. */ import RoutingTable from './routing-table' +import RawRoutingTable from './routing-table-raw' import Session from '../session' -import { RoutingTableGetterFactory } from './routing-table-getter' import ServerAddress from './server-address' +import { newError, SERVICE_UNAVAILABLE } from '../error' + +const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound' +const DATABASE_NOT_FOUND_CODE = 'Neo.ClientError.Database.DatabaseNotFound' export default class Rediscovery { /** * @constructor - * @param {RoutingTableGetterFactory} routingTableGetterFactory the util to use. + * @param {object} routingContext + * @param {string} initialAddress */ - constructor (routingTableGetterFactory) { - this._routingTableGetterFactory = routingTableGetterFactory + constructor (routingContext, initialAddress) { + this._routingContext = routingContext + this._initialAddress = initialAddress } /** @@ -39,15 +45,55 @@ export default class Rediscovery { */ lookupRoutingTableOnRouter (session, database, routerAddress) { return session._acquireConnection(connection => { - const routingTableGetter = this._routingTableGetterFactory.create( - connection - ) - return routingTableGetter.get( + return this._requestRawRoutingTable( connection, + session, database, - routerAddress, - session - ) + routerAddress + ).then(rawRoutingTable => { + if (rawRoutingTable.isNull) { + return null + } + return RoutingTable.fromRawRoutingTable( + database, + routerAddress, + rawRoutingTable + ) + }) + }) + } + + _requestRawRoutingTable (connection, session, database, routerAddress) { + return new Promise((resolve, reject) => { + connection.protocol().requestRoutingInformation({ + routingContext: this._routingContext, + initialAddress: this._initialAddress, + databaseName: database, + sessionContext: { + bookmark: session._lastBookmark, + mode: session._mode, + database: session._database, + afterComplete: session._onComplete + }, + onCompleted: resolve, + onError: error => { + if (error.code === DATABASE_NOT_FOUND_CODE) { + reject(error) + } else if (error.code === PROCEDURE_NOT_FOUND_CODE) { + // throw when getServers procedure not found because this is clearly a configuration issue + reject( + newError( + `Server at ${routerAddress.asHostPort()} can't perform routing. Make sure you are connecting to a causal cluster`, + SERVICE_UNAVAILABLE + ) + ) + } else { + // return nothing when failed to connect because code higher in the callstack is still able to retry with a + // different session towards a different router + resolve(RawRoutingTable.ofNull()) + } + } + }) }) } } diff --git a/src/internal/request-message.js b/src/internal/request-message.js index 03c74dc86..10098f32c 100644 --- a/src/internal/request-message.js +++ b/src/internal/request-message.js @@ -35,6 +35,7 @@ const GOODBYE = 0x02 // 0000 0010 // GOODBYE const BEGIN = 0x11 // 0001 0001 // BEGIN const COMMIT = 0x12 // 0001 0010 // COMMIT const ROLLBACK = 0x13 // 0001 0011 // ROLLBACK +const ROUTE = 0x66 // 0110 0110 // ROUTE const DISCARD = 0x2f // 0010 1111 // DISCARD const PULL = 0x3f // 0011 1111 // PULL @@ -209,6 +210,21 @@ export default class RequestMessage { () => `DISCARD ${JSON.stringify(metadata)}` ) } + + /** + * Generate the ROUTE message, this message is used to fetch the routing table from the server + * + * @param {object} routingContext The routing context used to define the routing table. Multi-datacenter deployments is one of its use cases + * @param {string} databaseName The name of the database to get the routing table for. + * @return {RequestMessage} the ROUTE message. + */ + static route (routingContext = {}, databaseName = null) { + return new RequestMessage( + ROUTE, + [routingContext, databaseName], + () => `ROUTE ${JSON.stringify(routingContext)} ${databaseName}` + ) + } } /** diff --git a/src/internal/routing-table-getter/index.js b/src/internal/routing-table-getter/index.js deleted file mode 100644 index 4fee9f968..000000000 --- a/src/internal/routing-table-getter/index.js +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import RoutingTableGetterFactory from './routing-table-getter-factory' - -export { RoutingTableGetterFactory } diff --git a/src/internal/routing-table-getter/routing-procedure-runner-multi-database.js b/src/internal/routing-table-getter/routing-procedure-runner-multi-database.js deleted file mode 100644 index 9227dcf78..000000000 --- a/src/internal/routing-table-getter/routing-procedure-runner-multi-database.js +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import RoutingProcedureRunner from './routing-procedure-runner' - -const CONTEXT = 'context' -const DATABASE = 'database' -const CALL_GET_ROUTING_TABLE_MULTI_DB = `CALL dbms.routing.getRoutingTable($${CONTEXT}, $${DATABASE})` - -/** - * Runs the Multi-Database procedure to get the Routing Table - */ -export default class MultiDatabaseRoutingProcedureRunner extends RoutingProcedureRunner { - constructor (initialAddress) { - super() - this._initialAddress = initialAddress - } - - /** - * Run the procedure - * - * @param {Connection} connection The connection use - * @param {string} database the database - * @param {string} routerAddress the router address - * @param {Session} session the session which was used to get the connection, - * it will be used to get lastBookmark and other properties - * - * @returns {Result} the result of the query - */ - run (connection, database, context, session) { - return this._runQuery( - connection, - CALL_GET_ROUTING_TABLE_MULTI_DB, - { - context: { - ...context, - address: this._initialAddress - }, - database: database || null - }, - session - ) - } -} diff --git a/src/internal/routing-table-getter/routing-procedure-runner-single-database.js b/src/internal/routing-table-getter/routing-procedure-runner-single-database.js deleted file mode 100644 index a9c079eea..000000000 --- a/src/internal/routing-table-getter/routing-procedure-runner-single-database.js +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import RoutingProcedureRunner from './routing-procedure-runner' - -const CONTEXT = 'context' -const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})` - -/** - * Runs the Single-Database procedure to get the Routing Table - */ -export default class SingleDatabaseRoutingProcedureRunner extends RoutingProcedureRunner { - /** - * Run the procedure - * - * @param {Connection} connection The connection use - * @param {string} database the database - * @param {string} routerAddress the router address - * @param {Session} session the session which was used to get the connection, - * it will be used to get lastBookmark and other properties - * - * @returns {Result} the result of the query - */ - run (connection, database, context, session) { - return this._runQuery( - connection, - CALL_GET_ROUTING_TABLE, - { context }, - session - ) - } -} diff --git a/src/internal/routing-table-getter/routing-procedure-runner.js b/src/internal/routing-table-getter/routing-procedure-runner.js deleted file mode 100644 index 14c844e52..000000000 --- a/src/internal/routing-table-getter/routing-procedure-runner.js +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import Result from '../../result' -import TxConfig from '../tx-config' - -export default class RoutingProcedureRunner { - /** - * @param {Connection} connection the connection - * @param {string} database the database - * @param {object} routerContext the router context - * @param {Session} session the session which was used to get the connection, - * it will be used to get lastBookmark and other properties - * - * @returns {Result} the result of the query - */ - run (connection, database, routerContext, session) { - throw new Error('not implemented') - } - - /** - * Run query using the connection - * @param {Connection} connection the connectiom - * @param {string} query the query - * @param {object} params the query params - * @param {Session} session the session which was used to get the connection, - * it will be used to get lastBookmark and other properties - * - * @returns {Result} the result of the query - */ - _runQuery (connection, query, params, session) { - const resultOberserver = connection.protocol().run(query, params, { - bookmark: session._lastBookmark, - txConfig: TxConfig.empty(), - mode: session._mode, - database: session._database, - afterComplete: session._onComplete - }) - return new Result(Promise.resolve(resultOberserver)) - } -} diff --git a/src/internal/routing-table-getter/routing-table-getter-factory.js b/src/internal/routing-table-getter/routing-table-getter-factory.js deleted file mode 100644 index 5d29d6811..000000000 --- a/src/internal/routing-table-getter/routing-table-getter-factory.js +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import { BOLT_PROTOCOL_V4_0 } from '../constants' -import Connection from '../connection' -import ProcedureRoutingTableGetter from './routing-table-getter-procedure' -import SingleDatabaseRoutingProcedureRunner from './routing-procedure-runner-single-database' -import MultiDatabaseRoutingProcedureRunner from './routing-procedure-runner-multi-database' - -/** - * Constructs the RoutingTableGetter according to the correct protocol version. - */ -export default class RoutingTableGetterFactory { - /** - * Constructor - * @param {Object} routingContext Context which the be used to define the routing table - * @param {string} initialAddress The address that the driver is connecting to, - * used by routing as a fallback when routing and clustering isn't configured. - */ - constructor (routingContext, initialAddress) { - this._routingContext = routingContext - this._initialAddress = initialAddress - } - - /** - * Creates the RoutingTableGetter using the given session and database - * - * @param {Connection} connection the connection to use - * @param {string} database the database name - * @param {string} routerAddress the URL of the router. - * @returns {ProcedureRoutingTableGetter} The routing table getter - */ - create (connection) { - const runner = - connection.protocol().version < BOLT_PROTOCOL_V4_0 - ? new SingleDatabaseRoutingProcedureRunner() - : new MultiDatabaseRoutingProcedureRunner(this._initialAddress) - - return new ProcedureRoutingTableGetter(this._routingContext, runner) - } -} diff --git a/src/internal/routing-table-getter/routing-table-getter-procedure.js b/src/internal/routing-table-getter/routing-table-getter-procedure.js deleted file mode 100644 index 9c6f001b6..000000000 --- a/src/internal/routing-table-getter/routing-table-getter-procedure.js +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import { newError, PROTOCOL_ERROR, SERVICE_UNAVAILABLE } from '../../error' -import ServerAddress from '../server-address' -import RoutingTable from '../routing-table' -import Integer, { int } from '../../integer' -import Session from '../../session' -import RoutingProcedureRunner from './routing-procedure-runner' - -const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound' -const DATABASE_NOT_FOUND_CODE = 'Neo.ClientError.Database.DatabaseNotFound' - -/** - * Get the routing table by running the procedure - */ -export default class ProcedureRoutingTableGetter { - /** - * Constructor - * @param {Object} routingContext - * @param {RoutingProcedureRunner} runner the procedure runner - */ - constructor (routingContext, runner) { - this._runner = runner - this._routingContext = routingContext - } - - /** - * Get the routing table by running the routing table procedure - * - * @param {Connection} connection The connection use - * @param {string} database the database - * @param {ServerAddress} routerAddress the router address - * @param {Session} session the session which was used to get the connection, - * it will be used to get lastBookmark and other properties - * - * @returns {Promise} The routing table - */ - async get (connection, database, routerAddress, session) { - const records = await this._runProcedure( - connection, - database, - routerAddress, - session - ) - if (records === null) { - return null // it should go to another layer to retry - } - - if (records.length !== 1) { - throw newError( - 'Illegal response from router "' + - routerAddress + - '". ' + - 'Received ' + - records.length + - ' records but expected only one.\n' + - JSON.stringify(records), - PROTOCOL_ERROR - ) - } - - const record = records[0] - - const expirationTime = this._parseTtl(record, routerAddress) - const { routers, readers, writers } = this._parseServers( - record, - routerAddress - ) - - assertNonEmpty(routers, 'routers', routerAddress) - assertNonEmpty(readers, 'readers', routerAddress) - - return new RoutingTable({ - database, - routers, - readers, - writers, - expirationTime - }) - } - - /** - * Run the routing query and fetch the records - * - * @param {Connection} connection the connection - * @param {string} database the database - * @param {string} routerAddress the router address - * @param {Session} session the session which was used to get the connection, - * it will be used to get lastBookmark and other properties - * - * @returns {Promise} the list of records fetched - */ - async _runProcedure (connection, database, routerAddress, session) { - try { - const result = await this._runner.run( - connection, - database, - this._routingContext, - session - ) - return result.records - } catch (error) { - if (error.code === DATABASE_NOT_FOUND_CODE) { - throw error - } else if (error.code === PROCEDURE_NOT_FOUND_CODE) { - // throw when getServers procedure not found because this is clearly a configuration issue - throw newError( - `Server at ${routerAddress.asHostPort()} can't perform routing. Make sure you are connecting to a causal cluster`, - SERVICE_UNAVAILABLE - ) - } else { - // return nothing when failed to connect because code higher in the callstack is still able to retry with a - // different session towards a different router - return null - } - } - } - - /** - * Parse the ttls from the record and return it - * - * @param {Record} record the record - * @param {string} routerAddress the router address - * @returns {number} the ttl - */ - _parseTtl (record, routerAddress) { - try { - const now = int(Date.now()) - const expires = int(record.get('ttl')) - .multiply(1000) - .add(now) - // if the server uses a really big expire time like Long.MAX_VALUE this may have overflowed - if (expires.lessThan(now)) { - return Integer.MAX_VALUE - } - return expires - } catch (error) { - throw newError( - `Unable to parse TTL entry from router ${routerAddress} from record:\n${JSON.stringify( - record - )}\nError message: ${error.message}`, - PROTOCOL_ERROR - ) - } - } - - /** - * Parse server from the Record. - * - * @param {Record} record the record - * @param {string} routerAddress the router address - * @returns {Object} The object with the list of routers, readers and writers - */ - _parseServers (record, routerAddress) { - try { - const servers = record.get('servers') - - let routers = [] - let readers = [] - let writers = [] - - servers.forEach(server => { - const role = server.role - const addresses = server.addresses - - if (role === 'ROUTE') { - routers = parseArray(addresses).map(address => - ServerAddress.fromUrl(address) - ) - } else if (role === 'WRITE') { - writers = parseArray(addresses).map(address => - ServerAddress.fromUrl(address) - ) - } else if (role === 'READ') { - readers = parseArray(addresses).map(address => - ServerAddress.fromUrl(address) - ) - } else { - throw newError('Unknown server role "' + role + '"', PROTOCOL_ERROR) - } - }) - - return { - routers: routers, - readers: readers, - writers: writers - } - } catch (error) { - throw newError( - `Unable to parse servers entry from router ${routerAddress} from record:\n${JSON.stringify( - record - )}\nError message: ${error.message}`, - PROTOCOL_ERROR - ) - } - } -} - -/** - * Assset if serverAddressesArray is not empty, throws and PROTOCOL_ERROR otherwise - * - * @param {string[]} serverAddressesArray array of addresses - * @param {string} serversName the server name - * @param {string} routerAddress the router address - */ -function assertNonEmpty (serverAddressesArray, serversName, routerAddress) { - if (serverAddressesArray.length === 0) { - throw newError( - 'Received no ' + serversName + ' from router ' + routerAddress, - PROTOCOL_ERROR - ) - } -} - -function parseArray (addresses) { - if (!Array.isArray(addresses)) { - throw new TypeError('Array expected but got: ' + addresses) - } - return Array.from(addresses) -} diff --git a/src/internal/routing-table-raw.js b/src/internal/routing-table-raw.js new file mode 100644 index 000000000..ea5854c42 --- /dev/null +++ b/src/internal/routing-table-raw.js @@ -0,0 +1,122 @@ +import Record from '../record' + +/** + * Represente the raw version of the routing table + */ +export default class RawRoutingTable { + /** + * Constructs the raw routing table for Record based result + * @param {record} record The record which will be used get the raw routing table + * @returns {RawRoutingTable} The raw routing table + */ + static ofRecord (record) { + if (record === null) { + return RawRoutingTable.ofNull() + } + return new RecordRawRoutingTable(record) + } + + /** + * Constructs the raw routing table for Success result for a Routing Message + * @param {object} response The result + * @returns {RawRoutingTable} The raw routing table + */ + static ofMessageResponse (response) { + if (response === null) { + return RawRoutingTable.ofNull() + } + return new ResponseRawRoutingTable(response) + } + + /** + * Construct the raw routing table of a null response + * + * @returns {RawRoutingTable} the raw routing table + */ + static ofNull () { + return new NullRawRoutingTable() + } + + /** + * Get raw ttl + * + * @returns {number|string} ttl Time to live + */ + get ttl () { + throw new Error('Not implemented') + } + + /** + * + * @typedef {Object} ServerRole + * @property {string} role the role of the address on the cluster + * @property {string[]} addresses the address within the role + * + * @return {ServerRole[]} list of servers addresses + */ + get servers () { + throw new Error('Not implemented') + } + + /** + * Indicates the result is null + * + * @returns {boolean} Is null + */ + get isNull () { + throw new Error('Not implemented') + } +} + +/** + * Get the raw routing table information from route message response + */ +class ResponseRawRoutingTable extends RawRoutingTable { + constructor (response) { + super() + this._response = response + } + + get ttl () { + return this._response.rt.ttl + } + + get servers () { + return this._response.rt.servers + } + + get isNull () { + return this._response === null + } +} + +/** + * Null routing table + */ +class NullRawRoutingTable extends RawRoutingTable { + get isNull () { + return true + } +} + +/** + * Get the raw routing table information from the record + */ +class RecordRawRoutingTable extends RawRoutingTable { + constructor (record) { + super() + this._record = record + } + + get ttl () { + return this._record.get('ttl') + } + + get servers () { + return this._record.get('servers') + } + + get isNull () { + return this._record === null + } +} diff --git a/src/internal/routing-table.js b/src/internal/routing-table.js index 7f5a344fb..e5ad0db9f 100644 --- a/src/internal/routing-table.js +++ b/src/internal/routing-table.js @@ -16,11 +16,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { int } from '../integer' +import Integer, { int } from '../integer' import { READ, WRITE } from '../driver' +import ServerAddress from './server-address' +import { PROTOCOL_ERROR, newError } from '../error' const MIN_ROUTERS = 1 +/** + * The routing table object used to determine the role of the servers in the driver. + */ export default class RoutingTable { constructor ({ database, routers, readers, writers, expirationTime } = {}) { this.database = database @@ -31,6 +36,18 @@ export default class RoutingTable { this.expirationTime = expirationTime || int(0) } + /** + * Create a valid routing table from a raw object + * + * @param {string} database the database name. It is used for logging purposes + * @param {ServerAddress} routerAddress The router address, it is used for loggin purposes + * @param {RawRoutingTable} rawRoutingTable Method used to get the raw routing table to be processed + * @param {RoutingTable} The valid Routing Table + */ + static fromRawRoutingTable (database, routerAddress, rawRoutingTable) { + return createValidRoutingTable(database, routerAddress, rawRoutingTable) + } + forget (address) { // Don't remove it from the set of routers, since that might mean we lose our ability to re-discover, // just remove it from the set of readers and writers, so that we don't use it for actual work without @@ -98,3 +115,132 @@ export default class RoutingTable { function removeFromArray (array, element) { return array.filter(item => item.asKey() !== element.asKey()) } + +/** + * Create a valid routing table from a raw object + * + * @param {string} database the database name. It is used for logging purposes + * @param {ServerAddress} routerAddress The router address, it is used for loggin purposes + * @param {RawRoutingTable} rawRoutingTable Method used to get the raw routing table to be processed + * @param {RoutingTable} The valid Routing Table + */ +export function createValidRoutingTable ( + database, + routerAddress, + rawRoutingTable +) { + const expirationTime = calculateExpirationTime(rawRoutingTable, routerAddress) + const { routers, readers, writers } = parseServers( + rawRoutingTable, + routerAddress + ) + + assertNonEmpty(routers, 'routers', routerAddress) + assertNonEmpty(readers, 'readers', routerAddress) + + return new RoutingTable({ + database, + routers, + readers, + writers, + expirationTime + }) +} + +/** + * Parse server from the RawRoutingTable. + * + * @param {RawRoutingTable} rawRoutingTable the raw routing table + * @param {string} routerAddress the router address + * @returns {Object} The object with the list of routers, readers and writers + */ +function parseServers (rawRoutingTable, routerAddress) { + try { + let routers = [] + let readers = [] + let writers = [] + + rawRoutingTable.servers.forEach(server => { + const role = server.role + const addresses = server.addresses + + if (role === 'ROUTE') { + routers = parseArray(addresses).map(address => + ServerAddress.fromUrl(address) + ) + } else if (role === 'WRITE') { + writers = parseArray(addresses).map(address => + ServerAddress.fromUrl(address) + ) + } else if (role === 'READ') { + readers = parseArray(addresses).map(address => + ServerAddress.fromUrl(address) + ) + } + }) + + return { + routers: routers, + readers: readers, + writers: writers + } + } catch (error) { + throw newError( + `Unable to parse servers entry from router ${routerAddress} from addresses:\n${JSON.stringify( + rawRoutingTable.servers + )}\nError message: ${error.message}`, + PROTOCOL_ERROR + ) + } +} + +/** + * Call the expiration time using the ttls from the raw routing table and return it + * + * @param {RawRoutingTable} rawRoutingTable the routing table + * @param {string} routerAddress the router address + * @returns {number} the ttl + */ +function calculateExpirationTime (rawRoutingTable, routerAddress) { + try { + const now = int(Date.now()) + const expires = int(rawRoutingTable.ttl) + .multiply(1000) + .add(now) + // if the server uses a really big expire time like Long.MAX_VALUE this may have overflowed + if (expires.lessThan(now)) { + return Integer.MAX_VALUE + } + return expires + } catch (error) { + throw newError( + `Unable to parse TTL entry from router ${routerAddress} from raw routing table:\n${JSON.stringify( + rawRoutingTable + )}\nError message: ${error.message}`, + PROTOCOL_ERROR + ) + } +} + +/** + * Assert if serverAddressesArray is not empty, throws and PROTOCOL_ERROR otherwise + * + * @param {string[]} serverAddressesArray array of addresses + * @param {string} serversName the server name + * @param {string} routerAddress the router address + */ +function assertNonEmpty (serverAddressesArray, serversName, routerAddress) { + if (serverAddressesArray.length === 0) { + throw newError( + 'Received no ' + serversName + ' from router ' + routerAddress, + PROTOCOL_ERROR + ) + } +} + +function parseArray (addresses) { + if (!Array.isArray(addresses)) { + throw new TypeError('Array expected but got: ' + addresses) + } + return Array.from(addresses) +} diff --git a/src/internal/stream-observers.js b/src/internal/stream-observers.js index f6ee7af56..69b86b07b 100644 --- a/src/internal/stream-observers.js +++ b/src/internal/stream-observers.js @@ -21,6 +21,7 @@ import Connection from './connection' import { newError, PROTOCOL_ERROR } from '../error' import Integer from '../integer' import { ALL } from './request-message' +import RawRoutingTable from './routing-table-raw' class StreamObserver { onNext (rawRecord) {} @@ -512,6 +513,96 @@ class CompletedObserver extends ResultStreamObserver { } } +class ProcedureRouteObserver extends StreamObserver { + constructor ({ resultObserver, connection, onError, onCompleted }) { + super() + + this._resultObserver = resultObserver + this._onError = onError + this._onCompleted = onCompleted + this._connection = connection + this._records = [] + resultObserver.subscribe(this) + } + + onNext (record) { + this._records.push(record) + } + + onError (error) { + if (error.code === PROTOCOL_ERROR) { + this._connection._handleProtocolError(error.message) + } + + if (this._onError) { + this._onError(error) + } + } + + onCompleted () { + if (this._records !== null && this._records.length !== 1) { + this.onError( + newError( + 'Illegal response from router. Received ' + + this._records.length + + ' records but expected only one.\n' + + JSON.stringify(this._records), + PROTOCOL_ERROR + ) + ) + return + } + + if (this._onCompleted) { + console.log('onCompeled exsits', this._records[0]) + this._onCompleted(RawRoutingTable.ofRecord(this._records[0])) + } + } +} + +class RouteObserver extends StreamObserver { + /** + * + * @param {Object} param - + * @param {Connection} param.connection + * @param {function(err: Error)} param.onError + * @param {function(RawRoutingTable)} param.onCompleted + */ + constructor ({ connection, onError, onCompleted } = {}) { + super() + + this._connection = connection + this._onError = onError + this._onCompleted = onCompleted + } + + onNext (record) { + this.onError( + newError( + 'Received RECORD when resetting: received record is: ' + + JSON.stringify(record), + PROTOCOL_ERROR + ) + ) + } + + onError (error) { + if (error.code === PROTOCOL_ERROR) { + this._connection._handleProtocolError(error.message) + } + + if (this._onError) { + this._onError(error) + } + } + + onCompleted (metadata) { + if (this._onCompleted) { + this._onCompleted(RawRoutingTable.ofMessageResponse(metadata)) + } + } +} + const _states = { READY_STREAMING: { // async start state @@ -582,5 +673,7 @@ export { LoginObserver, ResetObserver, FailedObserver, - CompletedObserver + CompletedObserver, + RouteObserver, + ProcedureRouteObserver } diff --git a/test/internal/bolt-protocol-v3.test.js b/test/internal/bolt-protocol-v3.test.js index 26be2ad23..a8d52067e 100644 --- a/test/internal/bolt-protocol-v3.test.js +++ b/test/internal/bolt-protocol-v3.test.js @@ -23,6 +23,10 @@ import utils from './test-utils' import Bookmark from '../../src/internal/bookmark' import TxConfig from '../../src/internal/tx-config' import { WRITE } from '../../src/driver' +import { + ProcedureRouteObserver, + ResultStreamObserver +} from '../../src/internal/stream-observers' describe('#unit BoltProtocolV3', () => { beforeEach(() => { @@ -151,6 +155,38 @@ describe('#unit BoltProtocolV3', () => { expect(protocol.version).toBe(3) }) + it('should request the routing table from the correct procedure', () => { + const expectedResultObserver = new ResultStreamObserver() + const protocol = new SpiedBoltProtocolV3(expectedResultObserver) + const routingContext = { abc: 'context ' } + const sessionContext = { bookmark: 'book' } + const onError = () => {} + const onCompleted = () => {} + + const observer = protocol.requestRoutingInformation({ + routingContext, + sessionContext, + onError, + onCompleted + }) + + expect(observer).toEqual( + new ProcedureRouteObserver({ + resultObserver: expectedResultObserver, + connection: null, + onCompleted, + onError + }) + ) + + expect(protocol._run.length).toEqual(1) + expect(protocol._run[0]).toEqual([ + 'CALL dbms.cluster.routing.getRoutingTable($context)', + { context: routingContext }, + { ...sessionContext, txConfig: TxConfig.empty() } + ]) + }) + describe('Bolt V4', () => { /** * @param {function(protocol: BoltProtocolV3)} fn @@ -186,3 +222,16 @@ describe('#unit BoltProtocolV3', () => { }) }) }) + +class SpiedBoltProtocolV3 extends BoltProtocolV3 { + constructor (resultObserver) { + super(null, null, false) + this._run = [] + this._resultObserver = resultObserver + } + + run () { + this._run.push([...arguments]) + return this._resultObserver + } +} diff --git a/test/internal/bolt-protocol-v4x0.test.js b/test/internal/bolt-protocol-v4x0.test.js index 8c8182ae8..2e2f3c932 100644 --- a/test/internal/bolt-protocol-v4x0.test.js +++ b/test/internal/bolt-protocol-v4x0.test.js @@ -23,6 +23,10 @@ import utils from './test-utils' import Bookmark from '../../src/internal/bookmark' import TxConfig from '../../src/internal/tx-config' import { WRITE } from '../../src/driver' +import { + ProcedureRouteObserver, + ResultStreamObserver +} from '../../src/internal/stream-observers' describe('#unit BoltProtocolV4x0', () => { beforeEach(() => { @@ -100,4 +104,56 @@ describe('#unit BoltProtocolV4x0', () => { expect(protocol.version).toBe(4) }) + + it('should request the routing table from the correct procedure', () => { + const expectedResultObserver = new ResultStreamObserver() + const protocol = new SpiedBoltProtocolV4x0(expectedResultObserver) + const routingContext = { abc: 'context ' } + const sessionContext = { bookmark: 'book' } + const databaseName = 'the name' + const onError = () => {} + const onCompleted = () => {} + const initialAddress = 'localhost:1234' + + const observer = protocol.requestRoutingInformation({ + routingContext, + sessionContext, + databaseName, + initialAddress, + onError, + onCompleted + }) + + expect(observer).toEqual( + new ProcedureRouteObserver({ + resultObserver: expectedResultObserver, + connection: null, + onCompleted, + onError + }) + ) + + expect(protocol._run.length).toEqual(1) + expect(protocol._run[0]).toEqual([ + 'CALL dbms.routing.getRoutingTable($context, $database)', + { + context: { ...routingContext, address: initialAddress }, + database: databaseName + }, + { ...sessionContext, txConfig: TxConfig.empty() } + ]) + }) }) + +class SpiedBoltProtocolV4x0 extends BoltProtocolV4x0 { + constructor (resultObserver) { + super(null, null, false) + this._run = [] + this._resultObserver = resultObserver + } + + run () { + this._run.push([...arguments]) + return this._resultObserver + } +} diff --git a/test/internal/bolt-protocol-v4x3.test.js b/test/internal/bolt-protocol-v4x3.test.js new file mode 100644 index 000000000..b8e2cdcbb --- /dev/null +++ b/test/internal/bolt-protocol-v4x3.test.js @@ -0,0 +1,204 @@ +/** + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import BoltProtocolV4x3 from '../../src/internal/bolt-protocol-v4x3' +import RequestMessage from '../../src/internal/request-message' +import utils from './test-utils' +import Bookmark from '../../src/internal/bookmark' +import TxConfig from '../../src/internal/tx-config' +import { WRITE } from '../../src/driver' +import { RouteObserver } from '../../src/internal/stream-observers' + +describe('#unit BoltProtocolV4x3', () => { + beforeEach(() => { + jasmine.addMatchers(utils.matchers) + }) + + it('should request routing information', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV4x3(recorder, null, false) + const routingContext = { someContextParam: 'value' } + const databaseName = 'name' + + const observer = protocol.requestRoutingInformation({ + routingContext, + databaseName + }) + + recorder.verifyMessageCount(1) + expect(recorder.messages[0]).toBeMessage( + RequestMessage.route({ ...routingContext, address: null }, databaseName) + ) + expect(recorder.observers).toEqual([observer]) + expect(observer).toEqual(jasmine.any(RouteObserver)) + expect(recorder.flushes).toEqual([true]) + }) + + it('should run a query', () => { + const database = 'testdb' + const bookmark = new Bookmark([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV4x3(recorder, null, false) + + const query = 'RETURN $x, $y' + const parameters = { x: 'x', y: 'y' } + + const observer = protocol.run(query, parameters, { + bookmark, + txConfig, + database, + mode: WRITE + }) + + recorder.verifyMessageCount(2) + + expect(recorder.messages[0]).toBeMessage( + RequestMessage.runWithMetadata(query, parameters, { + bookmark, + txConfig, + database, + mode: WRITE + }) + ) + expect(recorder.messages[1]).toBeMessage(RequestMessage.pull()) + expect(recorder.observers).toEqual([observer, observer]) + expect(recorder.flushes).toEqual([false, true]) + }) + it('should begin a transaction', () => { + const database = 'testdb' + const bookmark = new Bookmark([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV4x3(recorder, null, false) + + const observer = protocol.beginTransaction({ + bookmark, + txConfig, + database, + mode: WRITE + }) + + recorder.verifyMessageCount(1) + expect(recorder.messages[0]).toBeMessage( + RequestMessage.begin({ bookmark, txConfig, database, mode: WRITE }) + ) + expect(recorder.observers).toEqual([observer]) + expect(recorder.flushes).toEqual([true]) + }) + + it('should return correct bolt version number', () => { + const protocol = new BoltProtocolV4x3(null, null, false) + + expect(protocol.version).toBe(4.3) + }) + + it('should update metadata', () => { + const metadata = { t_first: 1, t_last: 2, db_hits: 3, some_other_key: 4 } + const protocol = new BoltProtocolV4x3(null, null, false) + + const transformedMetadata = protocol.transformMetadata(metadata) + + expect(transformedMetadata).toEqual({ + result_available_after: 1, + result_consumed_after: 2, + db_hits: 3, + some_other_key: 4 + }) + }) + + it('should initialize connection', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV4x3(recorder, null, false) + + const clientName = 'js-driver/1.2.3' + const authToken = { username: 'neo4j', password: 'secret' } + + const observer = protocol.initialize({ userAgent: clientName, authToken }) + + recorder.verifyMessageCount(1) + expect(recorder.messages[0]).toBeMessage( + RequestMessage.hello(clientName, authToken) + ) + expect(recorder.observers).toEqual([observer]) + expect(recorder.flushes).toEqual([true]) + }) + + it('should begin a transaction', () => { + const bookmark = new Bookmark([ + 'neo4j:bookmark:v1:tx1', + 'neo4j:bookmark:v1:tx2' + ]) + const txConfig = new TxConfig({ + timeout: 5000, + metadata: { x: 1, y: 'something' } + }) + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV4x3(recorder, null, false) + + const observer = protocol.beginTransaction({ + bookmark, + txConfig, + mode: WRITE + }) + + recorder.verifyMessageCount(1) + expect(recorder.messages[0]).toBeMessage( + RequestMessage.begin({ bookmark, txConfig, mode: WRITE }) + ) + expect(recorder.observers).toEqual([observer]) + expect(recorder.flushes).toEqual([true]) + }) + + it('should commit', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV4x3(recorder, null, false) + + const observer = protocol.commitTransaction() + + recorder.verifyMessageCount(1) + expect(recorder.messages[0]).toBeMessage(RequestMessage.commit()) + expect(recorder.observers).toEqual([observer]) + expect(recorder.flushes).toEqual([true]) + }) + + it('should rollback', () => { + const recorder = new utils.MessageRecordingConnection() + const protocol = new BoltProtocolV4x3(recorder, null, false) + + const observer = protocol.rollbackTransaction() + + recorder.verifyMessageCount(1) + expect(recorder.messages[0]).toBeMessage(RequestMessage.rollback()) + expect(recorder.observers).toEqual([observer]) + expect(recorder.flushes).toEqual([true]) + }) +}) diff --git a/test/internal/connection-channel.test.js b/test/internal/connection-channel.test.js index ed57e600b..630c63c62 100644 --- a/test/internal/connection-channel.test.js +++ b/test/internal/connection-channel.test.js @@ -123,12 +123,12 @@ describe('#integration ChannelConnection', () => { connection._negotiateProtocol() const boltMagicPreamble = '60 60 b0 17' - const protocolVersion4x2 = '00 00 02 04' + const protocolVersion4x3 = '00 00 03 04' const protocolVersion4x1 = '00 00 01 04' const protocolVersion4x0 = '00 00 00 04' const protocolVersion3 = '00 00 00 03' expect(channel.toHex()).toBe( - `${boltMagicPreamble} ${protocolVersion4x2} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` + `${boltMagicPreamble} ${protocolVersion4x3} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` ) }) diff --git a/test/internal/fake-connection.js b/test/internal/fake-connection.js index 198aa26fd..a0668527b 100644 --- a/test/internal/fake-connection.js +++ b/test/internal/fake-connection.js @@ -40,6 +40,7 @@ export default class FakeConnection extends Connection { this._open = true this._id = 0 this._databaseId = null + this._requestRoutingInformationMock = null this.creationTimestamp = Date.now() this.resetInvoked = 0 @@ -49,6 +50,9 @@ export default class FakeConnection extends Connection { this.seenProtocolOptions = [] this._server = {} this.protocolVersion = undefined + this.protocolErrorsHandled = 0 + this.seenProtocolErrors = [] + this.seenRequestRoutingInformation = [] } get id () { @@ -83,6 +87,12 @@ export default class FakeConnection extends Connection { this.seenParameters.push(parameters) this.seenProtocolOptions.push(protocolOptions) }, + requestRoutingInformation: params => { + this.seenRequestRoutingInformation.push(params) + if (this._requestRoutingInformationMock) { + this._requestRoutingInformationMock(params) + } + }, version: this.protocolVersion } } @@ -113,6 +123,11 @@ export default class FakeConnection extends Connection { return this.resetInvoked === times && this.releaseInvoked === times } + _handleProtocolError (message) { + this.protocolErrorsHandled++ + this.seenProtocolErrors.push(message) + } + withServerVersion (version) { this.version = version const serverVersion = ServerVersion.fromString(version) @@ -142,6 +157,11 @@ export default class FakeConnection extends Connection { return this } + withRequestRoutingInformationMock (requestRoutingInformationMock) { + this._requestRoutingInformationMock = requestRoutingInformationMock + return this + } + closed () { this._open = false return this diff --git a/test/internal/protocol-handshaker.test.js b/test/internal/protocol-handshaker.test.js index 6d62963ae..de2ca21dc 100644 --- a/test/internal/protocol-handshaker.test.js +++ b/test/internal/protocol-handshaker.test.js @@ -20,6 +20,8 @@ import ProtocolHandshaker from '../../src/internal/protocol-handshaker' import Logger from '../../src/internal/logger' import BoltProtocol from '../../src/internal/bolt-protocol-v1' +import BoltProtocolV4x3 from '../../src/internal/bolt-protocol-v4x3' + import { alloc } from '../../src/internal/node' describe('#unit ProtocolHandshaker', () => { @@ -42,13 +44,13 @@ describe('#unit ProtocolHandshaker', () => { expect(writtenBuffers.length).toEqual(1) const boltMagicPreamble = '60 60 b0 17' - const protocolVersion4x2 = '00 00 02 04' + const protocolVersion4x3 = '00 00 03 04' const protocolVersion4x1 = '00 00 01 04' const protocolVersion4x0 = '00 00 00 04' const protocolVersion3 = '00 00 00 03' expect(writtenBuffers[0].toHex()).toEqual( - `${boltMagicPreamble} ${protocolVersion4x2} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` + `${boltMagicPreamble} ${protocolVersion4x3} ${protocolVersion4x1} ${protocolVersion4x0} ${protocolVersion3}` ) }) @@ -71,6 +73,26 @@ describe('#unit ProtocolHandshaker', () => { expect(protocol instanceof BoltProtocol).toBeTruthy() }) + it('should create protocol 4.3', () => { + const handshaker = new ProtocolHandshaker( + null, + null, + null, + false, + Logger.noOp() + ) + + // buffer with Bolt V4.3 + const buffer = handshakeResponse(4, 3) + + const protocol = handshaker.createNegotiatedProtocol(buffer) + + expect(protocol).toBeDefined() + expect(protocol).not.toBeNull() + expect(protocol.version).toEqual(4.3) + expect(protocol instanceof BoltProtocolV4x3).toBeTruthy() + }) + it('should fail to create protocol from invalid version', () => { const handshaker = new ProtocolHandshaker( null, @@ -106,9 +128,9 @@ describe('#unit ProtocolHandshaker', () => { * @param {number} version * @return {BaseBuffer} */ -function handshakeResponse (version) { +function handshakeResponse (version, minor = 0) { const buffer = alloc(4) - buffer.writeInt32(version) + buffer.writeInt32((minor << 8) | version) buffer.reset() return buffer } diff --git a/test/internal/rediscovery.test.js b/test/internal/rediscovery.test.js index c3503a40a..6b3fbfa06 100644 --- a/test/internal/rediscovery.test.js +++ b/test/internal/rediscovery.test.js @@ -17,134 +17,254 @@ * limitations under the License. */ +import RawRoutingTable from '../../src/internal/routing-table-raw' import Rediscovery from '../../src/internal/rediscovery' import RoutingTable from '../../src/internal/routing-table' import ServerAddress from '../../src/internal/server-address' +import FakeConnection from './fake-connection' +import lolex from 'lolex' +import { int } from '../../src/integer' +import { PROTOCOL_ERROR, newError, SERVICE_UNAVAILABLE } from '../../src/error' + +const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound' +const DATABASE_NOT_FOUND_CODE = 'Neo.ClientError.Database.DatabaseNotFound' describe('#unit Rediscovery', () => { it('should return the routing table when it available', async () => { - const expectedRoutingTable = new RoutingTable({ - database: 'db', - expirationTime: 113, - routers: [ServerAddress.fromUrl('bolt://localhost:7687')], - writers: [ServerAddress.fromUrl('bolt://localhost:7686')], - readers: [ServerAddress.fromUrl('bolt://localhost:7683')] + runWithClockAt(Date.now(), async () => { + const ttl = int(123) + const routers = ['bolt://localhost:7687'] + const writers = ['bolt://localhost:7686'] + const readers = ['bolt://localhost:7683'] + const initialAddress = '127.0.0.1' + const routingContext = { context: '1234 ' } + const rawRoutingTable = RawRoutingTable.ofMessageResponse( + newMetadata({ ttl, routers, readers, writers }) + ) + + const expectedRoutingTable = new RoutingTable({ + database: 'db', + expirationTime: calculateExpirationTime(Date.now(), ttl), + routers: [ServerAddress.fromUrl('bolt://localhost:7687')], + writers: [ServerAddress.fromUrl('bolt://localhost:7686')], + readers: [ServerAddress.fromUrl('bolt://localhost:7683')] + }) + + const routingTable = await lookupRoutingTableOnRouter({ + initialAddress, + routingContext, + rawRoutingTable + }) + + expect(routingTable).toEqual(expectedRoutingTable) }) - const routingTableGetter = new FakeRoutingTableGetter( - Promise.resolve(expectedRoutingTable) - ) + }) + + it('should return the routing table null when it is not available', async () => { + const initialAddress = '127.0.0.1' + const routingContext = { context: '1234 ' } + const rawRoutingTable = RawRoutingTable.ofNull() const routingTable = await lookupRoutingTableOnRouter({ - routingTableGetter + initialAddress, + routingContext, + rawRoutingTable }) - expect(routingTable).toBe(expectedRoutingTable) + expect(routingTable).toEqual(null) }) - it('should call getter once with correct arguments', async () => { - const expectedRoutingTable = new RoutingTable() - const connection = { connection: 'abc' } - const database = 'adb' - const session = new FakeSession(connection) - const routerAddress = ServerAddress.fromUrl('bolt://localhost:7682') - const routingTableGetter = new FakeRoutingTableGetter( - Promise.resolve(expectedRoutingTable) + it('should call requestRoutingInformation with the correct params', async () => { + const ttl = int(123) + const routers = ['bolt://localhost:7687'] + const writers = ['bolt://localhost:7686'] + const readers = ['bolt://localhost:7683'] + const initialAddress = '127.0.0.1:1245' + const routingContext = { context: '1234 ' } + const database = 'this db' + const rawRoutingTable = RawRoutingTable.ofMessageResponse( + newMetadata({ ttl, routers, readers, writers }) + ) + const connection = new FakeConnection().withRequestRoutingInformationMock( + fakeOnError(rawRoutingTable) ) + const session = new FakeSession(connection) await lookupRoutingTableOnRouter({ - routingTableGetter, - connection, - session, database, - routerAddress + connection, + initialAddress, + routingContext, + rawRoutingTable }) - expect(routingTableGetter._called).toEqual(1) - expect(routingTableGetter._arguments).toEqual([ - connection, - database, - routerAddress, - session - ]) + expect(connection.seenRequestRoutingInformation.length).toEqual(1) + const requestParams = connection.seenRequestRoutingInformation[0] + expect(requestParams.routingContext).toEqual(routingContext) + expect(requestParams.databaseName).toEqual(database) + expect(requestParams.initialAddress).toEqual(initialAddress) + expect(requestParams.sessionContext).toEqual({ + bookmark: session._lastBookmark, + mode: session._mode, + database: session._database, + afterComplete: session._onComplete + }) }) - it('should acquire connection once', async () => { - const expectedRoutingTable = new RoutingTable() - const connection = { connection: 'abc' } - const database = 'adb' - const session = new FakeSession(connection) - const routerAddress = ServerAddress.fromUrl('bolt://localhost:7682') - const routingTableGetter = new FakeRoutingTableGetter( - Promise.resolve(expectedRoutingTable) - ) + it('should reject with DATABASE_NOT_FOUND_CODE when it happens ', async () => { + const expectedError = newError('Laia', DATABASE_NOT_FOUND_CODE) + try { + const initialAddress = '127.0.0.1' + const routingContext = { context: '1234 ' } - await lookupRoutingTableOnRouter({ - routingTableGetter, - connection, - session, - database, - routerAddress - }) + const connection = new FakeConnection().withRequestRoutingInformationMock( + fakeOnError(expectedError) + ) + await lookupRoutingTableOnRouter({ + initialAddress, + routingContext, + connection + }) - expect(session._called).toEqual(1) + fail('it should fail') + } catch (error) { + expect(error).toEqual(expectedError) + } }) - it('should create the routingTableGetter with the correct arguments', async () => { - const routingTable = new RoutingTable() - const connection = { connection: 'abc' } - const routingTableGetter = new FakeRoutingTableGetter( - Promise.resolve(routingTable) + it('should reject with PROCEDURE_NOT_FOUND_CODE when it happens ', async () => { + const routerAddress = ServerAddress.fromUrl('bolt://localhost:1235') + const expectedError = newError( + `Server at ${routerAddress.asHostPort()} can't perform routing. Make sure you are connecting to a causal cluster`, + SERVICE_UNAVAILABLE ) - const factory = new FakeRoutingTableGetterFactory(routingTableGetter) + try { + const initialAddress = '127.0.0.1' + const routingContext = { context: '1234 ' } - await lookupRoutingTableOnRouter({ - routingTableGetter, - factory, - connection - }) + const connection = new FakeConnection().withRequestRoutingInformationMock( + fakeOnError(newError('1de', PROCEDURE_NOT_FOUND_CODE)) + ) + await lookupRoutingTableOnRouter({ + initialAddress, + routingContext, + connection, + routerAddress + }) - expect(factory._called).toEqual(1) - expect(factory._arguments).toEqual([connection]) + fail('it should fail') + } catch (error) { + expect(error).toEqual(expectedError) + } }) - it('should return null when the getter resolves the table as null', async () => { - const routingTableGetter = new FakeRoutingTableGetter(Promise.resolve(null)) + it('should return null when it happens an unexpected error ocorrus', async () => { + const initialAddress = '127.0.0.1' + const routingContext = { context: '1234 ' } + const connection = new FakeConnection().withRequestRoutingInformationMock( + fakeOnError(newError('1de', 'abc')) + ) const routingTable = await lookupRoutingTableOnRouter({ - routingTableGetter + initialAddress, + routingContext, + connection }) - expect(routingTable).toBeNull() + expect(routingTable).toEqual(null) }) - it('should fail when the getter fails', async () => { - const expectedError = 'error' - try { - const routingTableGetter = new FakeRoutingTableGetter( - Promise.reject(expectedError) - ) - await lookupRoutingTableOnRouter({ routingTableGetter }) - fail('should not complete with success') - } catch (error) { - expect(error).toBe(expectedError) - } + it('should throw PROTOCOL_ERROR if the routing table is invalid', async () => { + runWithClockAt(Date.now(), async () => { + try { + const ttl = int(123) + const routers = ['bolt://localhost:7687'] + const writers = ['bolt://localhost:7686'] + const readers = [] + const initialAddress = '127.0.0.1' + const routingContext = { context: '1234 ' } + const rawRoutingTable = RawRoutingTable.ofMessageResponse( + newMetadata({ ttl, routers, readers, writers }) + ) + + await lookupRoutingTableOnRouter({ + initialAddress, + routingContext, + rawRoutingTable + }) + + fail('should not succeed') + } catch (error) { + expect(error).toEqual( + newError( + 'Received no readers from router localhost:7687', + PROTOCOL_ERROR + ) + ) + } + }) }) }) +function newMetadata ({ + ttl = int(42), + routers = [], + readers = [], + writers = [], + extra = [] +} = {}) { + const routersField = { + role: 'ROUTE', + addresses: routers + } + const readersField = { + role: 'READ', + addresses: readers + } + const writersField = { + role: 'WRITE', + addresses: writers + } + return { + rt: { + ttl, + servers: [routersField, readersField, writersField, ...extra] + } + } +} + +async function runWithClockAt (currentTime, callback) { + const clock = lolex.install() + try { + clock.setSystemTime(currentTime) + return await callback(currentTime) + } finally { + clock.uninstall() + } +} + +function calculateExpirationTime (currentTime, ttl) { + return int(currentTime + ttl.toNumber() * 1000) +} + function lookupRoutingTableOnRouter ({ database = 'db', routerAddress = ServerAddress.fromUrl('bolt://localhost:7687'), - routingTableGetter = new FakeRoutingTableGetter( - Promise.resolve(new RoutingTable()) - ), + routingContext = {}, + initialAddress = 'localhost:1235', session, - factory, - connection = {} + connection = new FakeConnection(), + rawRoutingTable } = {}) { - const _factory = - factory || new FakeRoutingTableGetterFactory(routingTableGetter) const _session = session || new FakeSession(connection) - const rediscovery = new Rediscovery(_factory) + + if (connection && rawRoutingTable) { + connection.withRequestRoutingInformationMock( + fakeOnCompleted(rawRoutingTable) + ) + } + + const rediscovery = new Rediscovery(routingContext, initialAddress) return rediscovery.lookupRoutingTableOnRouter( _session, @@ -152,37 +272,14 @@ function lookupRoutingTableOnRouter ({ routerAddress ) } - -class FakeRoutingTableGetter { - constructor (result) { - this._result = result - this._called = 0 - } - - get () { - this._called++ - this._arguments = [...arguments] - return this._result - } -} - -class FakeRoutingTableGetterFactory { - constructor (routingTableGetter) { - this._routingTableGetter = routingTableGetter - this._called = 0 - } - - create () { - this._called++ - this._arguments = [...arguments] - return this._routingTableGetter - } -} - class FakeSession { constructor (connection) { this._connection = connection this._called = 0 + this._lastBookmark = 'lastBook' + this._mode = 'READ' + this._database = 'session db' + this._onComplete = 'moked' } _acquireConnection (callback) { @@ -190,3 +287,11 @@ class FakeSession { return callback(this._connection) } } + +function fakeOnCompleted (raw = null) { + return ({ onCompleted }) => onCompleted(raw) +} + +function fakeOnError (error) { + return ({ onError }) => onError(error) +} diff --git a/test/internal/request-message.test.js b/test/internal/request-message.test.js index 584bbb590..fbd2ebf1f 100644 --- a/test/internal/request-message.test.js +++ b/test/internal/request-message.test.js @@ -215,4 +215,27 @@ describe('#unit RequestMessage', () => { ) }) }) + + describe('BoltV4.3', () => { + it('should create ROUTE message', () => { + const requestContext = { someValue: '1234' } + const database = 'user_db' + + const message = RequestMessage.route(requestContext, database) + + expect(message.signature).toEqual(0x66) + expect(message.fields).toEqual([requestContext, database]) + expect(message.toString()).toEqual( + `ROUTE ${JSON.stringify(requestContext)} ${database}` + ) + }) + + it('should create ROUTE message with default values', () => { + const message = RequestMessage.route() + + expect(message.signature).toEqual(0x66) + expect(message.fields).toEqual([{}, null]) + expect(message.toString()).toEqual(`ROUTE ${JSON.stringify({})} ${null}`) + }) + }) }) diff --git a/test/internal/routing-table-getter/routing-procdeure-runner-multi-database.test.js b/test/internal/routing-table-getter/routing-procdeure-runner-multi-database.test.js deleted file mode 100644 index bbba3f58e..000000000 --- a/test/internal/routing-table-getter/routing-procdeure-runner-multi-database.test.js +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import MultiDatabaseRountingProcuderRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-multi-database' -import ServerAddress from '../../../src/internal/server-address' -import TxConfig from '../../../src/internal/tx-config' -import Result from '../../../src/result' -import FakeConnection from '../fake-connection' -import FakeSession from '../fake-session' - -describe('#unit MultiDatabaseRountingProcuderRunner', () => { - it('should run query over protocol with the correct params', () => { - const initialAddress = ServerAddress.fromUrl('bolt://127.0.0.1') - const bookmark = 'bookmark' - const mode = 'READ' - const database = 'adb' - const sessionDatabase = 'session.database' - const onComplete = () => 'nothing' - const connection = new FakeConnection().withProtocolVersion(3) - const context = { someContext: '1234' } - const session = new FakeSession(null, connection) - .withBookmark(bookmark) - .withMode(mode) - .withDatabase(sessionDatabase) - .withOnComplete(onComplete) - - run({ connection, context, session, database, initialAddress }) - - expect(connection.seenQueries).toEqual([ - 'CALL dbms.routing.getRoutingTable($context, $database)' - ]) - expect(connection.seenParameters).toEqual([ - { context: { ...context, address: initialAddress }, database } - ]) - expect(connection.seenProtocolOptions).toEqual([ - { - bookmark, - txConfig: TxConfig.empty(), - mode, - database: sessionDatabase, - afterComplete: onComplete - } - ]) - }) - - it('should return a result', () => { - const connection = new FakeConnection().withProtocolVersion(3) - const context = { someContext: '1234' } - const session = new FakeSession(null, connection) - - const result = run({ connection, context, session }) - - expect(result).toEqual(jasmine.any(Result)) - expect(result._streamObserverPromise).toEqual(jasmine.any(Promise)) - }) -}) - -function run ({ - connection, - database = 'adb', - context, - session, - initialAddress -}) { - const runner = new MultiDatabaseRountingProcuderRunner(initialAddress) - return runner.run(connection, database, context, session) -} diff --git a/test/internal/routing-table-getter/routing-procedure-runner-single-database.test.js b/test/internal/routing-table-getter/routing-procedure-runner-single-database.test.js deleted file mode 100644 index 461c573f0..000000000 --- a/test/internal/routing-table-getter/routing-procedure-runner-single-database.test.js +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import SingleDatabaseRountingProcuderRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-single-database' -import TxConfig from '../../../src/internal/tx-config' -import Result from '../../../src/result' -import FakeConnection from '../fake-connection' -import FakeSession from '../fake-session' - -describe('#unit SingleDatabaseRountingProcuderRunner', () => { - it('should run query over protocol with the correct params', () => { - const bookmark = 'bookmark' - const mode = 'READ' - const sessionDatabase = 'session.database' - const onComplete = () => 'nothing' - const connection = new FakeConnection().withProtocolVersion(3) - const context = { someContext: '1234' } - const session = new FakeSession(null, connection) - .withBookmark(bookmark) - .withMode(mode) - .withDatabase(sessionDatabase) - .withOnComplete(onComplete) - - run({ connection, context, session }) - - expect(connection.seenQueries).toEqual([ - 'CALL dbms.cluster.routing.getRoutingTable($context)' - ]) - expect(connection.seenParameters).toEqual([{ context }]) - expect(connection.seenProtocolOptions).toEqual([ - { - bookmark, - txConfig: TxConfig.empty(), - mode, - database: sessionDatabase, - afterComplete: onComplete - } - ]) - }) - - it('should return a result', () => { - const connection = new FakeConnection().withProtocolVersion(3) - const context = { someContext: '1234' } - const session = new FakeSession(null, connection) - - const result = run({ connection, context, session }) - - expect(result).toEqual(jasmine.any(Result)) - expect(result._streamObserverPromise).toEqual(jasmine.any(Promise)) - }) -}) - -function run ({ connection, database = 'adb', context, session }) { - const runner = new SingleDatabaseRountingProcuderRunner() - return runner.run(connection, database, context, session) -} diff --git a/test/internal/routing-table-getter/routing-table-getter-factory.test.js b/test/internal/routing-table-getter/routing-table-getter-factory.test.js deleted file mode 100644 index 2f5895b53..000000000 --- a/test/internal/routing-table-getter/routing-table-getter-factory.test.js +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import RoutingTableGetterFactory from '../../../src/internal/routing-table-getter/routing-table-getter-factory' -import ProcedureRoutingTableGetter from '../../../src/internal/routing-table-getter/routing-table-getter-procedure' -import MultiDatabaseRoutingProcedureRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-multi-database' -import SingleDatabaseRoutingProcedureRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner-single-database' -import FakeConnection from '../fake-connection' - -const routingTableProcedureVersions = [3, 4, 4.1, 4.2] -const singleDatabaseProcedureVersion = routingTableProcedureVersions.filter( - version => version < 4 -) -const multiDatabaseProcedureVersion = routingTableProcedureVersions.filter( - version => version >= 4 -) - -describe('#unit RoutingTableGetterFactory', () => { - routingTableProcedureVersions.forEach(version => { - it(`should create ProcedureRoutingTableGetter when the protocol version is ${version}`, () => { - const connection = new FakeConnection().withProtocolVersion(version) - const routingContext = { region: 'china' } - const getter = createRoutingTableGetter({ connection, routingContext }) - - expect(getter).toEqual(jasmine.any(ProcedureRoutingTableGetter)) - expect(getter._routingContext).toEqual(routingContext) - expect(getter._) - }) - }) - - singleDatabaseProcedureVersion.forEach(version => { - it(`should configure SingleDatabaseRoutingProcedureRunner as the runner in the getter when the protocol version is ${version}`, () => { - const connection = new FakeConnection().withProtocolVersion(3) - const getter = createRoutingTableGetter({ connection }) - - expect(getter._runner).toEqual( - jasmine.any(SingleDatabaseRoutingProcedureRunner) - ) - }) - }) - - multiDatabaseProcedureVersion.forEach(version => { - it(`should configure MultiDatabaseRoutingProcedureRunner as the runner in the getter when the protocol version is ${version}`, () => { - const connection = new FakeConnection().withProtocolVersion(version) - const initialAddress = 'localhost' - const getter = createRoutingTableGetter({ connection, initialAddress }) - - expect(getter._runner).toEqual( - jasmine.any(MultiDatabaseRoutingProcedureRunner) - ) - expect(getter._runner._initialAddress).toEqual(initialAddress) - }) - }) - - function createRoutingTableGetter ({ - connection, - routingContext = {}, - initialAddress = '127.0.0.1' - }) { - const factory = new RoutingTableGetterFactory( - routingContext, - initialAddress - ) - return factory.create(connection) - } -}) diff --git a/test/internal/routing-table-getter/routing-table-getter-procedure.test.js b/test/internal/routing-table-getter/routing-table-getter-procedure.test.js deleted file mode 100644 index e98e70312..000000000 --- a/test/internal/routing-table-getter/routing-table-getter-procedure.test.js +++ /dev/null @@ -1,481 +0,0 @@ -/** - * Copyright (c) 2002-2020 "Neo4j," - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import ProcedureRoutingTableGetter from '../../../src/internal/routing-table-getter/routing-table-getter-procedure' -import RoutingProcedurRunner from '../../../src/internal/routing-table-getter/routing-procedure-runner' -import FakeConnection from '../fake-connection' -import { - newError, - SERVICE_UNAVAILABLE, - PROTOCOL_ERROR -} from '../../../lib/error' -import ServerAddress from '../../../src/internal/server-address' -import Record from '../../../src/record' -import Integer, { int } from '../../../src/integer' -import RoutingTable from '../../../src/internal/routing-table' -import lolex from 'lolex' - -const invalidRecords = [ - [], - [newRecord(), newRecord()], - [newRecord(), newRecord(), newRecord()] -] - -const invalidAddressesFieldValues = [ - 'localhost:1234', - [{ address: 'localhost:1244' }], - null, - 23 -] - -const invalidTtlValues = [null, undefined] - -describe('#unit ProcedureRoutingTableGetter', () => { - it('should call the runner with the correct params', async () => { - const routerContext = { region: 'china' } - const connection = new FakeConnection() - const database = 'fake-db' - const session = { session: 'sec' } - - const runnerCalledWith = { - capturer (connection, database, routerContext, session) { - this.connection = connection - this.database = database - this.routerContext = routerContext - this.session = session - return fakeResolvedResult() - } - } - - try { - await callProcedureRoutingTableGetter({ - routerContext, - connection, - database, - session, - runner: new FakeRoutingProcedureRunner({ - run: runnerCalledWith.capturer.bind(runnerCalledWith) - }) - }) - } finally { - expect(runnerCalledWith.connection).toEqual(connection) - expect(runnerCalledWith.database).toEqual(database) - expect(runnerCalledWith.routerContext).toEqual(routerContext) - expect(runnerCalledWith.session).toEqual(session) - } - }) - - it('should return the routing table', () => - runWithClockAt(Date.now(), async currentTime => { - const ttl = int(42) - const routers = ['router:7699'] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - const result = await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ ttl, routers, readers, writers })]) - }) - }) - - expect(result).toEqual( - new RoutingTable({ - database, - readers: readers.map(r => ServerAddress.fromUrl(r)), - routers: routers.map(r => ServerAddress.fromUrl(r)), - writers: writers.map(w => ServerAddress.fromUrl(w)), - expirationTime: calculateExpirationTime(currentTime, ttl) - }) - ) - })) - - it('should return Integer.MAX_VALUE as expirationTime when ttl overflowed', async () => { - const ttl = int(Integer.MAX_VALUE - 2) - const routers = ['router:7699'] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - const result = await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ ttl, routers, readers, writers })]) - }) - }) - expect(result.expirationTime).toEqual(Integer.MAX_VALUE) - }) - - it('should return Integer.MAX_VALUE as expirationTime when ttl is negative', async () => { - const ttl = int(-2) - const routers = ['router:7699'] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - const result = await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ ttl, routers, readers, writers })]) - }) - }) - expect(result.expirationTime).toEqual(Integer.MAX_VALUE) - }) - - invalidTtlValues.forEach(invalidTtlValue => { - it(`should throw PROTOCOL_ERROR when ttl is not valid [${invalidTtlValue}]`, async () => { - try { - const ttl = invalidTtlValue - const routers = ['router:7699'] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([ - newRecord({ ttl, routers, readers, writers }) - ]) - }) - }) - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain('Unable to parse TTL entry from router') - } - }) - }) - - it('should throw PROTOCOL_ERROR when ttl is not in the record', async () => { - try { - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => fakeResolvedResult([new Record(['noTtl'], ['1234'])]) - }) - }) - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain('Unable to parse TTL entry from router') - } - }) - - invalidAddressesFieldValues.forEach(invalidAddressFieldValue => { - it(`should throw PROTOCOL_ERROR when routers is not valid [${invalidAddressFieldValue}]`, async () => { - try { - const routers = invalidAddressFieldValue - const readers = ['reader1:7699', 'reader2:7699'] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ routers, readers, writers })]) - }) - }) - fail('should not succeed') - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain( - 'Unable to parse servers entry from router' - ) - } - }) - - it(`should throw PROTOCOL_ERROR when readers is not valid [${invalidAddressFieldValue}]`, async () => { - try { - const routers = ['router:7699'] - const readers = invalidAddressFieldValue - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ routers, readers, writers })]) - }) - }) - fail('should not succeed') - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain( - 'Unable to parse servers entry from router' - ) - } - }) - - it(`should throw PROTOCOL_ERROR when writers is not valid [${invalidAddressFieldValue}]`, async () => { - try { - const routers = ['router:7699'] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = invalidAddressFieldValue - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ routers, readers, writers })]) - }) - }) - fail('should not succeed') - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain( - 'Unable to parse servers entry from router' - ) - } - }) - }) - - it('should throw PROTOCOL_ERROR when it has an alien role', async () => { - try { - const routers = ['router:7699'] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const alienRole = { - role: 'ALIEN_ROLE', - addresses: ['alien:7699'] - } - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([ - newRecord({ routers, readers, writers, extra: [alienRole] }) - ]) - }) - }) - fail('should not succeed') - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain( - 'Unable to parse servers entry from router' - ) - } - }) - - it('should throw PROTOCOL_ERROR when there is no routers', async () => { - try { - const routers = [] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ routers, readers, writers })]) - }) - }) - fail('should not succeed') - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain('Received no') - } - }) - - it('should throw PROTOCOL_ERROR when there is no readers', async () => { - try { - const routers = ['router:7699'] - const readers = [] - const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] - const database = 'db' - await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ routers, readers, writers })]) - }) - }) - fail('should not succeed') - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain('Received no') - } - }) - - it('should return the routing when there is no writers', async () => { - const routers = ['router:7699'] - const readers = ['reader1:7699', 'reader2:7699'] - const writers = [] - const database = 'db' - const routingTable = await callProcedureRoutingTableGetter({ - database, - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeResolvedResult([newRecord({ routers, readers, writers })]) - }) - }) - expect(routingTable.readers).toEqual( - readers.map(r => ServerAddress.fromUrl(r)) - ) - expect(routingTable.routers).toEqual( - routers.map(r => ServerAddress.fromUrl(r)) - ) - expect(routingTable.writers).toEqual( - writers.map(r => ServerAddress.fromUrl(r)) - ) - }) - - invalidRecords.forEach(records => { - it(`should throws PROTOCOL_ERROR when records lenght is ${records.length}`, async () => { - try { - await callProcedureRoutingTableGetter({ - runner: new FakeRoutingProcedureRunner({ - run: () => fakeResolvedResult(records) - }) - }) - fail('should throws an exception') - } catch (error) { - expect(error.code).toEqual(PROTOCOL_ERROR) - expect(error.message).toContain('Illegal response from router') - } - }) - }) - - it('should throws Neo.ClientError.Database.DatabaseNotFound when this error occurs while run the query', async () => { - const expectedError = newError( - 'Some messsage', - 'Neo.ClientError.Database.DatabaseNotFound' - ) - try { - await callProcedureRoutingTableGetter({ - runner: new FakeRoutingProcedureRunner({ - run: () => fakeRejectedResult(expectedError) - }) - }) - fail('Expect to throws exception') - } catch (error) { - expect(error).toEqual(expectedError) - } - }) - - it('should throws SERVICE_UNAVAILABLE when Neo.ClientError.Procedure.ProcedureNotFound occurs', async () => { - try { - await callProcedureRoutingTableGetter({ - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeRejectedResult( - newError( - 'Some messsage', - 'Neo.ClientError.Procedure.ProcedureNotFound' - ) - ) - }) - }) - fail('Expect to throws exception') - } catch (error) { - expect(error.code).toEqual(SERVICE_UNAVAILABLE) - } - }) - - it('should return null when another error ocurrs', async () => { - const result = await callProcedureRoutingTableGetter({ - runner: new FakeRoutingProcedureRunner({ - run: () => - fakeRejectedResult(newError('Some messsage', 'another error')) - }) - }) - expect(result).toBeNull() - }) - - function callProcedureRoutingTableGetter ({ - routerContext = {}, - runner = new FakeRoutingProcedureRunner(), - connection = new FakeConnection(), - database = 'adb', - routerAddress = 'neo4j://127.0.0.1:7687', - session = {} - }) { - const getter = new ProcedureRoutingTableGetter(routerContext, runner) - return getter.get( - connection, - database, - ServerAddress.fromUrl(routerAddress), - session - ) - } -}) - -class FakeRoutingProcedureRunner extends RoutingProcedurRunner { - constructor ({ run = shouldNotBeCalled }) { - super() - this._run = run - } - - run (connection, database, routerContext, session) { - return this._run(connection, database, routerContext, session) - } -} - -function newRecord ({ - ttl = int(42), - routers = [], - readers = [], - writers = [], - extra = [] -} = {}) { - const routersField = { - role: 'ROUTE', - addresses: routers - } - const readersField = { - role: 'READ', - addresses: readers - } - const writersField = { - role: 'WRITE', - addresses: writers - } - return new Record( - ['ttl', 'servers'], - [ttl, [...extra, routersField, readersField, writersField]] - ) -} - -async function runWithClockAt (currentTime, callback) { - const clock = lolex.install() - try { - clock.setSystemTime(currentTime) - return await callback(currentTime) - } finally { - clock.uninstall() - } -} - -function calculateExpirationTime (currentTime, ttl) { - return int(currentTime + ttl.toNumber() * 1000) -} - -function fakeResolvedResult (records = null) { - return Promise.resolve({ - records - }) -} - -function fakeRejectedResult (error) { - return Promise.reject(error) -} - -function shouldNotBeCalled () { - throw new Error('Should not be called') -} diff --git a/test/internal/routing-table-raw.test.js b/test/internal/routing-table-raw.test.js new file mode 100644 index 000000000..29034ccd1 --- /dev/null +++ b/test/internal/routing-table-raw.test.js @@ -0,0 +1,172 @@ +import RawRoutingTable from '../../src/internal/routing-table-raw' +import Record from '../../src/record' + +describe('#unit RawRoutingTable', () => { + describe('ofNull', () => { + shouldReturnNullRawRoutingTable(() => RawRoutingTable.ofNull()) + }) + + describe('ofRecord', () => { + describe('when record is null', () => { + shouldReturnNullRawRoutingTable(() => RawRoutingTable.ofRecord(null)) + }) + + describe('when record has servers and ttl', () => { + it('should return isNull equals false', () => { + const record = newRecord({ + ttl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(result.isNull).toEqual(false) + }) + + it('should return the ttl', () => { + const record = newRecord({ + ttl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(result.ttl).toEqual(123) + }) + + it('should return the ttl', () => { + const record = newRecord({ + ttl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(result.servers).toEqual([ + { role: 'READ', addresses: ['127.0.0.1'] } + ]) + }) + }) + + describe('when record has servers and but no ttl', () => { + it('should return isNull equals false', () => { + const record = newRecord({ + noTtl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(result.isNull).toEqual(false) + }) + + it('should throws when try to get ttl', () => { + const record = newRecord({ + noTtl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(() => result.ttl).toThrow() + }) + + it('should return the servers', () => { + const record = newRecord({ + noTtl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(result.servers).toEqual([ + { role: 'READ', addresses: ['127.0.0.1'] } + ]) + }) + }) + + describe('when record has ttl and but no servers', () => { + it('should return isNull equals false', () => { + const record = newRecord({ + ttl: 123, + noServers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(result.isNull).toEqual(false) + }) + + it('should return the ttl', () => { + const record = newRecord({ + ttl: 123, + noServers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(result.ttl).toEqual(123) + }) + + it('should hrows when try to get servers', () => { + const record = newRecord({ + ttl: 123, + noServers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofRecord(record) + expect(() => result.servers).toThrow() + }) + }) + }) + + describe('ofMessageResponse', () => { + shouldReturnNullRawRoutingTable(() => + RawRoutingTable.ofMessageResponse(null) + ) + + it('should return isNull equals false', () => { + const response = newResponse({ + ttl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofMessageResponse(response) + expect(result.isNull).toEqual(false) + }) + + it('should return the ttl', () => { + const response = newResponse({ + ttl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofMessageResponse(response) + expect(result.ttl).toEqual(123) + }) + + it('should return the ttl', () => { + const response = newResponse({ + ttl: 123, + servers: [{ role: 'READ', addresses: ['127.0.0.1'] }] + }) + const result = RawRoutingTable.ofMessageResponse(response) + expect(result.servers).toEqual([ + { role: 'READ', addresses: ['127.0.0.1'] } + ]) + }) + }) + + function shouldReturnNullRawRoutingTable (subject) { + it('should create a null routing table', () => { + const result = subject() + + expect(result.isNull).toEqual(true) + }) + + it('should not implement ttl', () => { + expect(() => { + const ttl = subject().ttl + fail(`it should not return ${ttl}`) + }).toThrow(new Error('Not implemented')) + }) + + it('should not implement servers', () => { + expect(() => { + const servers = subject().servers + fail(`it should not return ${servers}`) + }).toThrow(new Error('Not implemented')) + }) + } + + function newRecord (params = {}) { + return new Record(Object.keys(params), Object.values(params)) + } + + function newResponse (params = {}) { + return { + rt: { ...params } + } + } +}) diff --git a/test/internal/routing-table.test.js b/test/internal/routing-table.test.js index 2718ffc87..25803eb59 100644 --- a/test/internal/routing-table.test.js +++ b/test/internal/routing-table.test.js @@ -17,9 +17,21 @@ * limitations under the License. */ import RoutingTable from '../../src/internal/routing-table' -import { int } from '../../src/integer' +import Integer, { int } from '../../src/integer' import { READ, WRITE } from '../../src/driver' import ServerAddress from '../../src/internal/server-address' +import RawRoutingTable from '../../src/internal/routing-table-raw' +import { PROTOCOL_ERROR } from '../../src/error' +import lolex from 'lolex' + +const invalidAddressesFieldValues = [ + 'localhost:1234', + [{ address: 'localhost:1244' }], + null, + 23 +] + +const invalidTtlValues = [null, undefined] describe('#unit RoutingTable', () => { const server1 = ServerAddress.fromUrl('server1') @@ -222,6 +234,282 @@ describe('#unit RoutingTable', () => { } }) + describe('fromRawRoutingTable', () => { + it('should return the routing table', () => + runWithClockAt(Date.now(), async currentTime => { + const ttl = int(42) + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + const result = routingTable({ + database, + metadata: newMetadata({ ttl, routers, readers, writers }) + }) + + expect(result).toEqual( + new RoutingTable({ + database, + readers: readers.map(r => ServerAddress.fromUrl(r)), + routers: routers.map(r => ServerAddress.fromUrl(r)), + writers: writers.map(w => ServerAddress.fromUrl(w)), + expirationTime: calculateExpirationTime(currentTime, ttl) + }) + ) + })) + + it('should return Integer.MAX_VALUE as expirationTime when ttl overflowed', async () => { + const ttl = int(Integer.MAX_VALUE - 2) + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + const result = routingTable({ + database, + metadata: newMetadata({ ttl, routers, readers, writers }) + }) + + expect(result.expirationTime).toEqual(Integer.MAX_VALUE) + }) + + it('should return Integer.MAX_VALUE as expirationTime when ttl is negative', async () => { + const ttl = int(-2) + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + const result = routingTable({ + database, + metadata: newMetadata({ ttl, routers, readers, writers }) + }) + + expect(result.expirationTime).toEqual(Integer.MAX_VALUE) + }) + + invalidTtlValues.forEach(invalidTtlValue => { + it(`should throw PROTOCOL_ERROR when ttl is not valid [${invalidTtlValue}]`, async () => { + try { + const ttl = invalidTtlValue + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + routingTable({ + database, + metadata: newMetadata({ ttl, routers, readers, writers }) + }) + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse TTL entry from router' + ) + } + }) + }) + + it('should throw PROTOCOL_ERROR when ttl is not in the metatadata', async () => { + try { + const database = 'db' + + routingTable({ database, metadata: { rt: { noTtl: 123 } } }) + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Unable to parse TTL entry from router') + } + }) + + invalidAddressesFieldValues.forEach(invalidAddressFieldValue => { + it(`should throw PROTOCOL_ERROR when routers is not valid [${invalidAddressFieldValue}]`, async () => { + try { + const routers = invalidAddressFieldValue + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + routingTable({ + database, + metadata: newMetadata({ routers, readers, writers }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse servers entry from router' + ) + } + }) + + it(`should throw PROTOCOL_ERROR when readers is not valid [${invalidAddressFieldValue}]`, async () => { + try { + const routers = ['router:7699'] + const readers = invalidAddressFieldValue + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + routingTable({ + database, + metadata: newMetadata({ routers, readers, writers }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse servers entry from router' + ) + } + }) + + it(`should throw PROTOCOL_ERROR when writers is not valid [${invalidAddressFieldValue}]`, async () => { + try { + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = invalidAddressFieldValue + const database = 'db' + + routingTable({ + database, + metadata: newMetadata({ routers, readers, writers }) + }) + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain( + 'Unable to parse servers entry from router' + ) + } + }) + }) + + it('should return the known roles independent of the alien roles', async () => { + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const alienRole = { + role: 'ALIEN_ROLE', + addresses: ['alien:7699'] + } + const database = 'db' + + const result = routingTable({ + database, + metadata: newMetadata({ routers, readers, writers, extra: [alienRole] }) + }) + + expect(result.readers).toEqual(readers.map(r => ServerAddress.fromUrl(r))) + expect(result.routers).toEqual(routers.map(r => ServerAddress.fromUrl(r))) + expect(result.writers).toEqual(writers.map(r => ServerAddress.fromUrl(r))) + }) + + it('should throw PROTOCOL_ERROR when there is no routers', async () => { + try { + const routers = [] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + routingTable({ + database, + metadata: newMetadata({ routers, readers, writers }) + }) + + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Received no') + } + }) + + it('should throw PROTOCOL_ERROR when there is no readers', async () => { + try { + const routers = ['router:7699'] + const readers = [] + const writers = ['writer1:7693', 'writer2:7692', 'writer3:7629'] + const database = 'db' + + routingTable({ + database, + metadata: newMetadata({ routers, readers, writers }) + }) + + fail('should not succeed') + } catch (error) { + expect(error.code).toEqual(PROTOCOL_ERROR) + expect(error.message).toContain('Received no') + } + }) + + it('should return the routing when there is no writers', async () => { + const routers = ['router:7699'] + const readers = ['reader1:7699', 'reader2:7699'] + const writers = [] + const database = 'db' + const result = routingTable({ + database, + metadata: newMetadata({ routers, readers, writers }) + }) + + expect(result.readers).toEqual(readers.map(r => ServerAddress.fromUrl(r))) + expect(result.routers).toEqual(routers.map(r => ServerAddress.fromUrl(r))) + expect(result.writers).toEqual(writers.map(r => ServerAddress.fromUrl(r))) + }) + + function routingTable ({ + database = 'database', + serverAddress = 'localhost:7687', + metadata + } = {}) { + return RoutingTable.fromRawRoutingTable( + database, + ServerAddress.fromUrl(serverAddress), + RawRoutingTable.ofMessageResponse(metadata) + ) + } + + function newMetadata ({ + ttl = int(42), + routers = [], + readers = [], + writers = [], + extra = [] + } = {}) { + const routersField = { + role: 'ROUTE', + addresses: routers + } + const readersField = { + role: 'READ', + addresses: readers + } + const writersField = { + role: 'WRITE', + addresses: writers + } + return { + rt: { + ttl, + servers: [routersField, readersField, writersField, ...extra] + } + } + } + + async function runWithClockAt (currentTime, callback) { + const clock = lolex.install() + try { + clock.setSystemTime(currentTime) + return await callback(currentTime) + } finally { + clock.uninstall() + } + } + + function calculateExpirationTime (currentTime, ttl) { + return int(currentTime + ttl.toNumber() * 1000) + } + }) function expired (expiredFor) { return Date.now() - (expiredFor || 3600) // expired an hour ago } diff --git a/test/internal/stream-observer.test.js b/test/internal/stream-observer.test.js index a2605efd1..0939dd6b0 100644 --- a/test/internal/stream-observer.test.js +++ b/test/internal/stream-observer.test.js @@ -18,7 +18,14 @@ */ import FakeConnection from './fake-connection' -import { ResultStreamObserver } from '../../src/internal/stream-observers' +import { + ResultStreamObserver, + RouteObserver, + ProcedureRouteObserver +} from '../../src/internal/stream-observers' +import RawRoutingTable from '../../src/internal/routing-table-raw' +import { PROTOCOL_ERROR, newError } from '../../src/error' +import Record from '../../src/record' const NO_OP = () => {} @@ -193,6 +200,283 @@ describe('#unit ResultStreamObserver', () => { }) }) +describe('#unit RouteObserver', () => { + it('should call onCompleted with the metadata', () => { + let onCompleteCalled = false + const expectedMetadata = { someMeta: '134' } + + newRouteObserver({ + onCompleted: metadata => { + onCompleteCalled = true + expect(metadata).toEqual( + RawRoutingTable.ofMessageResponse(expectedMetadata) + ) + } + }).onCompleted(expectedMetadata) + + expect(onCompleteCalled).toEqual(true) + }) + + it('should call onError with the error', () => { + let onErrorCalled = false + const expectedError = newError('something wrong') + + newRouteObserver({ + onError: metadata => { + onErrorCalled = true + expect(metadata).toBe(expectedError) + } + }).onError(expectedError) + + expect(onErrorCalled).toEqual(true) + }) + + it('should call onError with a protocol error', () => { + let onErrorCalled = false + const expectedError = newError('something wrong', PROTOCOL_ERROR) + + newRouteObserver({ + onError: metadata => { + onErrorCalled = true + expect(metadata).toBe(expectedError) + } + }).onError(expectedError) + + expect(onErrorCalled).toEqual(true) + }) + + it('should call connection._handleProtocolError when a protocol error occurs', () => { + const connection = new FakeConnection() + const expectedError = newError('something wrong', PROTOCOL_ERROR) + + newRouteObserver({ + onError: null, + connection + }).onError(expectedError) + + expect(connection.protocolErrorsHandled).toEqual(1) + expect(connection.seenProtocolErrors).toEqual([expectedError.message]) + }) + + it('should call onError with a protocol error it receive a record', () => { + let onErrorCalled = false + const record = new Record(['a'], ['b']) + const expectedError = newError( + 'Received RECORD when resetting: received record is: ' + + JSON.stringify(record), + PROTOCOL_ERROR + ) + + newRouteObserver({ + onError: error => { + onErrorCalled = true + expect(error).toEqual(expectedError) + } + }).onNext(record) + + expect(onErrorCalled).toEqual(true) + }) + + it('should call connection._handleProtocolError with a protocol error it receive a record', () => { + const connection = new FakeConnection() + const record = new Record(['a'], ['b']) + const expectedErrorMessage = + 'Received RECORD when resetting: received record is: ' + + JSON.stringify(record) + + newRouteObserver({ + onError: null, + connection + }).onNext(record) + + expect(connection.protocolErrorsHandled).toEqual(1) + expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + }) + + function newRouteObserver ({ + onCompleted = shouldNotBeCalled('onComplete'), + onError = shouldNotBeCalled('onError'), + connection = new FakeConnection() + } = {}) { + return new RouteObserver({ connection, onCompleted, onError }) + } + + function shouldNotBeCalled (methodName) { + return () => fail(`${methodName} should not be called`) + } +}) + +describe('#unit ProcedureRouteObserver', () => { + it('should call resultObserver.subscribe on the constructor', () => { + const resultObserver = new FakeResultStreamObserver() + const observer = newRouteObserver({ resultObserver }) + + expect(resultObserver.subscribedObservers.length).toEqual(1) + expect(resultObserver.subscribedObservers[0]).toEqual(observer) + }) + + it('should call onCompleted with the RawRoutingTable of Record if it has 1 records', () => { + let onCompleteCalled = false + const record = new Record(['a'], ['b']) + const observer = newRouteObserver({ + onCompleted: metadata => { + onCompleteCalled = true + expect(metadata).toEqual(RawRoutingTable.ofRecord(record)) + } + }) + + observer.onNext(record) + observer.onCompleted() + + expect(onCompleteCalled).toEqual(true) + }) + + it('should call onError with a protocol error it receive 0 records', () => { + let onErrorCalled = false + const expectedError = newError( + 'Illegal response from router. Received 0 records but expected only one.\n' + + JSON.stringify([]), + PROTOCOL_ERROR + ) + const observer = newRouteObserver({ + onError: error => { + onErrorCalled = true + expect(error).toEqual(expectedError) + } + }) + + observer.onCompleted() + + expect(onErrorCalled).toEqual(true) + }) + + it('should call connection._handleProtocolError with a protocol error it receive 0 records', () => { + const connection = new FakeConnection() + const expectedErrorMessage = + 'Illegal response from router. Received 0 records but expected only one.\n' + + JSON.stringify([]) + + newRouteObserver({ + onError: null, + connection + }).onCompleted() + + expect(connection.protocolErrorsHandled).toEqual(1) + expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + }) + + it('should call onError with a protocol error it receive more than one record', () => { + let onErrorCalled = false + const record = new Record(['a'], ['b']) + const expectedError = newError( + 'Illegal response from router. Received 2 records but expected only one.\n' + + JSON.stringify([record, record]), + PROTOCOL_ERROR + ) + const observer = newRouteObserver({ + onError: error => { + onErrorCalled = true + expect(error).toEqual(expectedError) + } + }) + + observer.onNext(record) + observer.onNext(record) + observer.onCompleted() + + expect(onErrorCalled).toEqual(true) + }) + + it('should call connection._handleProtocolError with a protocol error it receive 0 records', () => { + const connection = new FakeConnection() + const record = new Record(['a'], ['b']) + const expectedErrorMessage = + 'Illegal response from router. Received 2 records but expected only one.\n' + + JSON.stringify([record, record]) + + const observer = newRouteObserver({ + onError: null, + connection + }) + + observer.onNext(record) + observer.onNext(record) + observer.onCompleted() + + expect(connection.protocolErrorsHandled).toEqual(1) + expect(connection.seenProtocolErrors).toEqual([expectedErrorMessage]) + }) + + it('should call onError with the error', () => { + let onErrorCalled = false + const expectedError = newError('something wrong') + + newRouteObserver({ + onError: metadata => { + onErrorCalled = true + expect(metadata).toBe(expectedError) + } + }).onError(expectedError) + + expect(onErrorCalled).toEqual(true) + }) + + it('should call onError with a protocol error', () => { + let onErrorCalled = false + const expectedError = newError('something wrong', PROTOCOL_ERROR) + + newRouteObserver({ + onError: metadata => { + onErrorCalled = true + expect(metadata).toBe(expectedError) + } + }).onError(expectedError) + + expect(onErrorCalled).toEqual(true) + }) + + it('should call connection._handleProtocolError when a protocol error occurs', () => { + const connection = new FakeConnection() + const expectedError = newError('something wrong', PROTOCOL_ERROR) + + newRouteObserver({ + onError: null, + connection + }).onError(expectedError) + + expect(connection.protocolErrorsHandled).toEqual(1) + expect(connection.seenProtocolErrors).toEqual([expectedError.message]) + }) + + function newRouteObserver ({ + onCompleted = shouldNotBeCalled('onComplete'), + onError = shouldNotBeCalled('onError'), + connection = new FakeConnection(), + resultObserver = new FakeResultStreamObserver() + } = {}) { + return new ProcedureRouteObserver({ + resultObserver, + connection, + onCompleted, + onError + }) + } + + function shouldNotBeCalled (methodName) { + return () => fail(`${methodName} should not be called`) + } + + class FakeResultStreamObserver { + constructor () { + this.subscribedObservers = [] + } + + subscribe (observer) { + this.subscribedObservers.push(observer) + } + } +}) + function newStreamObserver (connection) { return new ResultStreamObserver({ connection