Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create 4.3 Protocol and rediscovery the Routing Table using the Route Message #646

Merged
merged 21 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion src/internal/bolt-protocol-v3.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import { assertDatabaseIsEmpty } from './bolt-protocol-util'
import {
StreamObserver,
LoginObserver,
ResultStreamObserver
ResultStreamObserver,
ProcedureRouteObserver
} from './stream-observers'
import { BOLT_PROTOCOL_V3 } from './constants'
import Bookmark from './bookmark'
import TxConfig from './tx-config'
const CONTEXT = 'context'
const CALL_GET_ROUTING_TABLE = `CALL dbms.cluster.routing.getRoutingTable($${CONTEXT})`

const noOpObserver = new StreamObserver()

Expand Down Expand Up @@ -183,4 +188,39 @@ export default class BoltProtocol extends BoltProtocolV2 {

return observer
}

/**
* Request routing information
*
* @param {Object} param -
* @param {object} param.routingContext The routing context used to define the routing table.
* Multi-datacenter deployments is one of its use cases
* @param {string} param.databaseName The database name
* @param {Bookmark} params.sessionContext.bookmark The bookmark used for request the routing table
* @param {string} params.sessionContext.mode The session mode
* @param {string} params.sessionContext.database The database name used on the session
* @param {function()} params.sessionContext.afterComplete The session param used after the session closed
* @param {function(err: Error)} param.onError
* @param {function(RawRoutingTable)} param.onCompleted
* @returns {RouteObserver} the route observer
*/
requestRoutingInformation ({
routingContext = {},
sessionContext = {},
onError,
onCompleted
}) {
const resultObserver = this.run(
CALL_GET_ROUTING_TABLE,
{ [CONTEXT]: routingContext },
{ ...sessionContext, txConfig: TxConfig.empty() }
)

return new ProcedureRouteObserver({
resultObserver,
connection: this._connection,
onError,
onCompleted
})
}
}
51 changes: 50 additions & 1 deletion src/internal/bolt-protocol-v4x0.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@
*/
import BoltProtocolV3 from './bolt-protocol-v3'
import RequestMessage, { ALL } from './request-message'
import { ResultStreamObserver } from './stream-observers'
import {
ResultStreamObserver,
ProcedureRouteObserver
} from './stream-observers'
import { BOLT_PROTOCOL_V4_0 } from './constants'
import Bookmark from './bookmark'
import TxConfig from './tx-config'

const CONTEXT = 'context'
const DATABASE = 'database'
const CALL_GET_ROUTING_TABLE_MULTI_DB = `CALL dbms.routing.getRoutingTable($${CONTEXT}, $${DATABASE})`

export default class BoltProtocol extends BoltProtocolV3 {
get version () {
Expand Down Expand Up @@ -119,4 +128,44 @@ export default class BoltProtocol extends BoltProtocolV3 {
}

_noOp () {}

/**
* Request routing information
*
* @param {Object} param -
* @param {object} param.routingContext The routing context used to define the routing table.
* Multi-datacenter deployments is one of its use cases
* @param {string} param.databaseName The database name
* @param {Bookmark} params.sessionContext.bookmark The bookmark used for request the routing table
* @param {string} params.sessionContext.mode The session mode
* @param {string} params.sessionContext.database The database name used on the session
* @param {function()} params.sessionContext.afterComplete The session param used after the session closed
* @param {function(err: Error)} param.onError
* @param {function(RawRoutingTable)} param.onCompleted
* @returns {RouteObserver} the route observer
*/
requestRoutingInformation ({
routingContext = {},
databaseName = null,
sessionContext = {},
initialAddress = null,
onError,
onCompleted
}) {
const resultObserver = this.run(
CALL_GET_ROUTING_TABLE_MULTI_DB,
{
[CONTEXT]: { ...routingContext, address: initialAddress },
[DATABASE]: databaseName
},
{ ...sessionContext, txConfig: TxConfig.empty() }
)

return new ProcedureRouteObserver({
resultObserver,
connection: this._connection,
onError,
onCompleted
})
}
}
11 changes: 0 additions & 11 deletions src/internal/bolt-protocol-v4x2.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@ import BoltProtocolV41 from './bolt-protocol-v4x1'
import { BOLT_PROTOCOL_V4_2 } from './constants'

export default class BoltProtocol extends BoltProtocolV41 {
/**
* @constructor
* @param {Connection} connection the connection.
* @param {Chunker} chunker the chunker.
* @param {boolean} disableLosslessIntegers if this connection should convert all received integers to native JS numbers.
* @param {Object} serversideRouting
*/
constructor (connection, chunker, disableLosslessIntegers, serversideRouting) {
super(connection, chunker, disableLosslessIntegers, serversideRouting)
}

get version () {
return BOLT_PROTOCOL_V4_2
}
Expand Down
65 changes: 65 additions & 0 deletions src/internal/bolt-protocol-v4x3.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import BoltProtocolV42 from './bolt-protocol-v4x2'
import { BOLT_PROTOCOL_V4_3 } from './constants'
import RequestMessage from './request-message'
import { RouteObserver } from './stream-observers'

export default class BoltProtocol extends BoltProtocolV42 {
get version () {
return BOLT_PROTOCOL_V4_3
}

/**
* Request routing information
*
* @param {Object} param -
* @param {object} param.routingContext The routing context used to define the routing table.
* Multi-datacenter deployments is one of its use cases
* @param {string} param.databaseName The database name
* @param {function(err: Error)} param.onError
* @param {function(RawRoutingTable)} param.onCompleted
* @returns {RouteObserver} the route observer
*/

requestRoutingInformation ({
routingContext = {},
databaseName = null,
initialAddress = null,
onError,
onCompleted
}) {
const observer = new RouteObserver({
connection: this._connection,
onError,
onCompleted
})

this._connection.write(
RequestMessage.route(
{ ...routingContext, address: initialAddress },
databaseName
),
observer,
true
)

return observer
}
}
5 changes: 1 addition & 4 deletions src/internal/connection-provider-routing.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { READ, WRITE } from '../driver'
import Session from '../session'
import RoutingTable from './routing-table'
import Rediscovery from './rediscovery'
import { RoutingTableGetterFactory } from './routing-table-getter'
import { HostNameResolver } from './node'
import SingleConnectionProvider from './connection-provider-single'
import PooledConnectionProvider from './connection-provider-pooled'
Expand Down Expand Up @@ -65,9 +64,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider

this._seedRouter = address
this._routingTables = {}
this._rediscovery = new Rediscovery(
new RoutingTableGetterFactory(routingContext, address.toString())
)
this._rediscovery = new Rediscovery(routingContext, address.toString())
this._loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(
this._connectionPool
)
Expand Down
4 changes: 3 additions & 1 deletion src/internal/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const BOLT_PROTOCOL_V3 = 3
const BOLT_PROTOCOL_V4_0 = 4.0
const BOLT_PROTOCOL_V4_1 = 4.1
const BOLT_PROTOCOL_V4_2 = 4.2
const BOLT_PROTOCOL_V4_3 = 4.3

export {
ACCESS_MODE_READ,
Expand All @@ -35,5 +36,6 @@ export {
BOLT_PROTOCOL_V3,
BOLT_PROTOCOL_V4_0,
BOLT_PROTOCOL_V4_1,
BOLT_PROTOCOL_V4_2
BOLT_PROTOCOL_V4_2,
BOLT_PROTOCOL_V4_3
}
11 changes: 9 additions & 2 deletions src/internal/protocol-handshaker.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import BoltProtocolV3 from './bolt-protocol-v3'
import BoltProtocolV4x0 from './bolt-protocol-v4x0'
import BoltProtocolV4x1 from './bolt-protocol-v4x1'
import BoltProtocolV4x2 from './bolt-protocol-v4x2'

import BoltProtocolV4x3 from './bolt-protocol-v4x3'
const BOLT_MAGIC_PREAMBLE = 0x6060b017

export default class ProtocolHandshaker {
Expand Down Expand Up @@ -132,6 +132,13 @@ export default class ProtocolHandshaker {
this._disableLosslessIntegers,
this._serversideRouting
)
case 4.3:
return new BoltProtocolV4x3(
this._connection,
this._chunker,
this._disableLosslessIntegers,
this._serversideRouting
)
default:
throw newError('Unknown Bolt protocol version: ' + version)
}
Expand All @@ -149,7 +156,7 @@ function newHandshakeBuffer () {
handshakeBuffer.writeInt32(BOLT_MAGIC_PREAMBLE)

// proposed versions
handshakeBuffer.writeInt32((2 << 8) | 4)
handshakeBuffer.writeInt32((3 << 8) | 4)
handshakeBuffer.writeInt32((1 << 8) | 4)
handshakeBuffer.writeInt32(4)
handshakeBuffer.writeInt32(3)
Expand Down
68 changes: 57 additions & 11 deletions src/internal/rediscovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
* limitations under the License.
*/
import RoutingTable from './routing-table'
import RawRoutingTable from './routing-table-raw'
import Session from '../session'
import { RoutingTableGetterFactory } from './routing-table-getter'
import ServerAddress from './server-address'
import { newError, SERVICE_UNAVAILABLE } from '../error'

const PROCEDURE_NOT_FOUND_CODE = 'Neo.ClientError.Procedure.ProcedureNotFound'
const DATABASE_NOT_FOUND_CODE = 'Neo.ClientError.Database.DatabaseNotFound'

export default class Rediscovery {
/**
* @constructor
* @param {RoutingTableGetterFactory} routingTableGetterFactory the util to use.
* @param {object} routingContext
* @param {string} initialAddress
*/
constructor (routingTableGetterFactory) {
this._routingTableGetterFactory = routingTableGetterFactory
constructor (routingContext, initialAddress) {
this._routingContext = routingContext
this._initialAddress = initialAddress
}

/**
Expand All @@ -39,15 +45,55 @@ export default class Rediscovery {
*/
lookupRoutingTableOnRouter (session, database, routerAddress) {
return session._acquireConnection(connection => {
const routingTableGetter = this._routingTableGetterFactory.create(
connection
)
return routingTableGetter.get(
return this._requestRawRoutingTable(
connection,
session,
database,
routerAddress,
session
)
routerAddress
).then(rawRoutingTable => {
if (rawRoutingTable.isNull) {
return null
}
return RoutingTable.fromRawRoutingTable(
database,
routerAddress,
rawRoutingTable
)
})
})
}

_requestRawRoutingTable (connection, session, database, routerAddress) {
return new Promise((resolve, reject) => {
connection.protocol().requestRoutingInformation({
routingContext: this._routingContext,
initialAddress: this._initialAddress,
databaseName: database,
sessionContext: {
bookmark: session._lastBookmark,
mode: session._mode,
database: session._database,
afterComplete: session._onComplete
},
onCompleted: resolve,
onError: error => {
if (error.code === DATABASE_NOT_FOUND_CODE) {
reject(error)
} else if (error.code === PROCEDURE_NOT_FOUND_CODE) {
// throw when getServers procedure not found because this is clearly a configuration issue
reject(
newError(
`Server at ${routerAddress.asHostPort()} can't perform routing. Make sure you are connecting to a causal cluster`,
SERVICE_UNAVAILABLE
)
)
} else {
// return nothing when failed to connect because code higher in the callstack is still able to retry with a
// different session towards a different router
resolve(RawRoutingTable.ofNull())
}
}
})
})
}
}
16 changes: 16 additions & 0 deletions src/internal/request-message.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const GOODBYE = 0x02 // 0000 0010 // GOODBYE
const BEGIN = 0x11 // 0001 0001 // BEGIN <metadata>
const COMMIT = 0x12 // 0001 0010 // COMMIT
const ROLLBACK = 0x13 // 0001 0011 // ROLLBACK
const ROUTE = 0x66 // 0110 0110 // ROUTE

const DISCARD = 0x2f // 0010 1111 // DISCARD
const PULL = 0x3f // 0011 1111 // PULL
Expand Down Expand Up @@ -209,6 +210,21 @@ export default class RequestMessage {
() => `DISCARD ${JSON.stringify(metadata)}`
)
}

/**
* Generate the ROUTE message, this message is used to fetch the routing table from the server
*
* @param {object} routingContext The routing context used to define the routing table. Multi-datacenter deployments is one of its use cases
* @param {string} databaseName The name of the database to get the routing table for.
* @return {RequestMessage} the ROUTE message.
*/
static route (routingContext = {}, databaseName = null) {
return new RequestMessage(
ROUTE,
[routingContext, databaseName],
() => `ROUTE ${JSON.stringify(routingContext)} ${databaseName}`
)
}
}

/**
Expand Down
Loading