Skip to content

Commit

Permalink
Merge pull request #408 from lutovich/1.7-resolver
Browse files Browse the repository at this point in the history
Configurable server address resolver
  • Loading branch information
zhenlineo authored Sep 10, 2018
2 parents c7c7123 + 1024a44 commit cf14dcc
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 13 deletions.
14 changes: 14 additions & 0 deletions src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,20 @@ const logging = {
* level: 'info',
* logger: (level, message) => console.log(level + ' ' + message)
* },
*
* // Specify a custom server address resolver function used by the routing driver to resolve the initial address used to create the driver.
* // Such resolution happens:
* // * during the very first rediscovery when driver is created
* // * when all the known routers from the current routing table have failed and driver needs to fallback to the initial address
* //
* // In NodeJS environment driver defaults to performing a DNS resolution of the initial address using 'dns' module.
* // In browser environment driver uses the initial address as-is.
* // Value should be a function that takes a single string argument - the initial address. It should return an array of new addresses.
* // Address is a string of shape '<host>:<port>'. Provided function can return either a Promise resolved with an array of addresses
* // or array of addresses directly.
* resolver: function(address) {
* return ['127.0.0.1:8888', 'fallback.db.com:7687'];
* },
* }
*
* @param {string} url The URL for the Neo4j database, for instance "bolt://localhost"
Expand Down
7 changes: 4 additions & 3 deletions src/v1/internal/connection-providers.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ export class DirectConnectionProvider extends ConnectionProvider {

export class LoadBalancer extends ConnectionProvider {

constructor(hostPort, routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback, log) {
constructor(hostPort, routingContext, connectionPool, loadBalancingStrategy, hostNameResolver, driverOnErrorCallback, log) {
super();
this._seedRouter = hostPort;
this._routingTable = new RoutingTable([this._seedRouter]);
this._rediscovery = new Rediscovery(new RoutingUtil(routingContext));
this._connectionPool = connectionPool;
this._driverOnErrorCallback = driverOnErrorCallback;
this._hostNameResolver = LoadBalancer._createHostNameResolver();
this._loadBalancingStrategy = loadBalancingStrategy;
this._hostNameResolver = hostNameResolver;
this._log = log;
this._useSeedRouter = false;
}
Expand Down Expand Up @@ -175,7 +175,8 @@ export class LoadBalancer extends ConnectionProvider {
}

_fetchRoutingTableUsingSeedRouter(seenRouters, seedRouter) {
return this._hostNameResolver.resolve(seedRouter).then(resolvedRouterAddresses => {
const resolvedAddresses = this._hostNameResolver.resolve(seedRouter);
return resolvedAddresses.then(resolvedRouterAddresses => {
// filter out all addresses that we've already tried
const newAddresses = resolvedRouterAddresses.filter(address => seenRouters.indexOf(address) < 0);
return this._fetchRoutingTable(newAddresses, null);
Expand Down
19 changes: 19 additions & 0 deletions src/v1/internal/host-name-resolvers.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ export class DummyHostNameResolver extends HostNameResolver {
}
}

export class ConfiguredHostNameResolver extends HostNameResolver {

constructor(resolverFunction) {
super();
this._resolverFunction = resolverFunction;
}

resolve(seedRouter) {
return new Promise(resolve => resolve(this._resolverFunction(seedRouter)))
.then(resolved => {
if (!Array.isArray(resolved)) {
throw new TypeError(`Configured resolver function should either return an array of addresses or a Promise resolved with an array of addresses.` +
`Each address is '<host>:<port>'. Got: ${resolved}`);
}
return resolved;
});
}
}

export class DnsHostNameResolver extends HostNameResolver {

constructor() {
Expand Down
26 changes: 24 additions & 2 deletions src/v1/routing-driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import {LoadBalancer} from './internal/connection-providers';
import LeastConnectedLoadBalancingStrategy, {LEAST_CONNECTED_STRATEGY_NAME} from './internal/least-connected-load-balancing-strategy';
import RoundRobinLoadBalancingStrategy, {ROUND_ROBIN_STRATEGY_NAME} from './internal/round-robin-load-balancing-strategy';
import ConnectionErrorHandler from './internal/connection-error-handler';
import hasFeature from './internal/features';
import {ConfiguredHostNameResolver, DnsHostNameResolver, DummyHostNameResolver} from './internal/host-name-resolvers';

/**
* A driver that supports routing in a causal cluster.
Expand All @@ -41,7 +43,8 @@ class RoutingDriver extends Driver {

_createConnectionProvider(hostPort, connectionPool, driverOnErrorCallback) {
const loadBalancingStrategy = RoutingDriver._createLoadBalancingStrategy(this._config, connectionPool);
return new LoadBalancer(hostPort, this._routingContext, connectionPool, loadBalancingStrategy, driverOnErrorCallback, this._log);
const resolver = createHostNameResolver(this._config);
return new LoadBalancer(hostPort, this._routingContext, connectionPool, loadBalancingStrategy, resolver, driverOnErrorCallback, this._log);
}

_createConnectionErrorHandler() {
Expand Down Expand Up @@ -85,12 +88,31 @@ class RoutingDriver extends Driver {

/**
* @private
* @returns {HostNameResolver} new resolver.
*/
function createHostNameResolver(config) {
if (config.resolver) {
return new ConfiguredHostNameResolver(config.resolver);
}
if (hasFeature('dns_lookup')) {
return new DnsHostNameResolver();
}
return new DummyHostNameResolver();
}

/**
* @private
* @returns {object} the given config.
*/
function validateConfig(config) {
if (config.trust === 'TRUST_ON_FIRST_USE') {
throw newError('The chosen trust mode is not compatible with a routing driver');
}
const resolver = config.resolver;
if (resolver && typeof resolver !== 'function') {
throw new TypeError(`Configured resolver should be a function. Got: ${resolver}`);
}
return config;
}

export default RoutingDriver
export default RoutingDriver;
8 changes: 3 additions & 5 deletions test/internal/bolt-stub.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,10 @@ class StubServer {
}
}

function newDriver(url) {
function newDriver(url, config = {}) {
// boltstub currently does not support encryption, create driver with encryption turned off
const config = {
encrypted: 'ENCRYPTION_OFF'
};
return neo4j.driver(url, sharedNeo4j.authToken, config);
const newConfig = Object.assign({encrypted: 'ENCRYPTION_OFF'}, config);
return neo4j.driver(url, sharedNeo4j.authToken, newConfig);
}

const supportedStub = SupportedBoltStub.create();
Expand Down
7 changes: 5 additions & 2 deletions test/internal/connection-providers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {DirectConnectionProvider, LoadBalancer} from '../../src/v1/internal/conn
import Pool from '../../src/v1/internal/pool';
import LeastConnectedLoadBalancingStrategy from '../../src/v1/internal/least-connected-load-balancing-strategy';
import Logger from '../../src/v1/internal/logger';
import {DummyHostNameResolver} from '../../src/v1/internal/host-name-resolvers';

const NO_OP_DRIVER_CALLBACK = () => {
};
Expand Down Expand Up @@ -138,7 +139,8 @@ describe('LoadBalancer', () => {
it('initializes routing table with the given router', () => {
const connectionPool = newPool();
const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(connectionPool);
const loadBalancer = new LoadBalancer('server-ABC', {}, connectionPool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK, Logger.noOp());
const loadBalancer = new LoadBalancer('server-ABC', {}, connectionPool, loadBalancingStrategy, new DummyHostNameResolver(),
NO_OP_DRIVER_CALLBACK, Logger.noOp());

expectRoutingTable(loadBalancer,
['server-ABC'],
Expand Down Expand Up @@ -1074,7 +1076,8 @@ function newLoadBalancerWithSeedRouter(seedRouter, seedRouterResolved,
connectionPool = null) {
const pool = connectionPool || newPool();
const loadBalancingStrategy = new LeastConnectedLoadBalancingStrategy(pool);
const loadBalancer = new LoadBalancer(seedRouter, {}, pool, loadBalancingStrategy, NO_OP_DRIVER_CALLBACK, Logger.noOp());
const loadBalancer = new LoadBalancer(seedRouter, {}, pool, loadBalancingStrategy, new DummyHostNameResolver(),
NO_OP_DRIVER_CALLBACK, Logger.noOp());
loadBalancer._routingTable = new RoutingTable(routers, readers, writers, expirationTime);
loadBalancer._rediscovery = new FakeRediscovery(routerToRoutingTable);
loadBalancer._hostNameResolver = new FakeDnsResolver(seedRouterResolved);
Expand Down
1 change: 1 addition & 0 deletions test/resources/boltstub/get_routing_table.script
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ S: SUCCESS {"fields": ["name"]}
RECORD ["Bob"]
RECORD ["Eve"]
SUCCESS {}
S: <EXIT>
7 changes: 7 additions & 0 deletions test/v1/routing-driver.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import RoundRobinLoadBalancingStrategy from '../../src/v1/internal/round-robin-l
import LeastConnectedLoadBalancingStrategy from '../../src/v1/internal/least-connected-load-balancing-strategy';
import RoutingDriver from '../../src/v1/routing-driver';
import Pool from '../../src/v1/internal/pool';
import neo4j from '../../src/v1';

describe('RoutingDriver', () => {

Expand All @@ -43,6 +44,12 @@ describe('RoutingDriver', () => {
expect(() => createStrategy({loadBalancingStrategy: 'wrong'})).toThrow();
});

it('should fail when configured resolver is of illegal type', () => {
expect(() => neo4j.driver('bolt+routing://localhost', {}, {resolver: 'string instead of a function'})).toThrowError(TypeError);
expect(() => neo4j.driver('bolt+routing://localhost', {}, {resolver: []})).toThrowError(TypeError);
expect(() => neo4j.driver('bolt+routing://localhost', {}, {resolver: {}})).toThrowError(TypeError);
});

});

function createStrategy(config) {
Expand Down
153 changes: 152 additions & 1 deletion test/v1/routing.driver.boltkit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import neo4j from '../../src/v1';
import {READ, WRITE} from '../../src/v1/driver';
import boltStub from '../internal/bolt-stub';
import RoutingTable from '../../src/v1/internal/routing-table';
import {SESSION_EXPIRED} from '../../src/v1/error';
import {SERVICE_UNAVAILABLE, SESSION_EXPIRED} from '../../src/v1/error';
import lolex from 'lolex';

describe('routing driver with stub server', () => {
Expand Down Expand Up @@ -1915,6 +1915,89 @@ describe('routing driver with stub server', () => {
testAddressPurgeOnDatabaseError(`RETURN 1`, READ, done);
});

it('should use resolver function that returns array during first discovery', done => {
testResolverFunctionDuringFirstDiscovery(['127.0.0.1:9010'], done);
});

it('should use resolver function that returns promise during first discovery', done => {
testResolverFunctionDuringFirstDiscovery(Promise.resolve(['127.0.0.1:9010']), done);
});

it('should fail first discovery when configured resolver function throws', done => {
const failureFunction = () => {
throw new Error('Broken resolver');
};
testResolverFunctionFailureDuringFirstDiscovery(failureFunction, null, 'Broken resolver', done);
});

it('should fail first discovery when configured resolver function returns no addresses', done => {
const failureFunction = () => {
return [];
};
testResolverFunctionFailureDuringFirstDiscovery(failureFunction, SERVICE_UNAVAILABLE, 'No routing servers available', done);
});

it('should fail first discovery when configured resolver function returns a string instead of array of addresses', done => {
const failureFunction = () => {
return 'Hello';
};
testResolverFunctionFailureDuringFirstDiscovery(failureFunction, null, 'Configured resolver function should either return an array of addresses', done);
});

it('should use resolver function during rediscovery when existing routers fail', done => {
if (!boltStub.supported) {
done();
return;
}

const router1 = boltStub.start('./test/resources/boltstub/get_routing_table.script', 9001);
const router2 = boltStub.start('./test/resources/boltstub/acquire_endpoints.script', 9042);
const reader = boltStub.start('./test/resources/boltstub/read_server.script', 9005);

boltStub.run(() => {
const resolverFunction = address => {
if (address === '127.0.0.1:9001') {
return ['127.0.0.1:9010', '127.0.0.1:9011', '127.0.0.1:9042'];
}
throw new Error(`Unexpected address ${address}`);
};

const driver = boltStub.newDriver('bolt+routing://127.0.0.1:9001', {resolver: resolverFunction});

const session = driver.session(READ);
// run a query that should trigger discovery against 9001 and then read from it
session.run('MATCH (n) RETURN n.name AS name')
.then(result => {
expect(result.records.map(record => record.get(0))).toEqual(['Alice', 'Bob', 'Eve']);

// 9001 should now exit and read transaction should fail to read from all existing readers
// it should then rediscover using addresses from resolver, only 9042 of them works and can respond with table containing reader 9005
session.readTransaction(tx => tx.run('MATCH (n) RETURN n.name'))
.then(result => {
expect(result.records.map(record => record.get(0))).toEqual(['Bob', 'Alice', 'Tina']);

assertHasRouters(driver, ['127.0.0.1:9001', '127.0.0.1:9002', '127.0.0.1:9003']);
assertHasReaders(driver, ['127.0.0.1:9005', '127.0.0.1:9006']);
assertHasWriters(driver, ['127.0.0.1:9007', '127.0.0.1:9008']);

session.close(() => {
driver.close();
router1.exit(code1 => {
router2.exit(code2 => {
reader.exit(code3 => {
expect(code1).toEqual(0);
expect(code2).toEqual(0);
expect(code3).toEqual(0);
done();
});
});
});
});
}).catch(done.fail);
}).catch(done.fail);
});
});

function testAddressPurgeOnDatabaseError(query, accessMode, done) {
if (!boltStub.supported) {
done();
Expand Down Expand Up @@ -2146,6 +2229,74 @@ describe('routing driver with stub server', () => {
return Object.keys(driver._openConnections).length;
}

function testResolverFunctionDuringFirstDiscovery(resolutionResult, done) {
if (!boltStub.supported) {
done();
return;
}

const router = boltStub.start('./test/resources/boltstub/acquire_endpoints.script', 9010);
const reader = boltStub.start('./test/resources/boltstub/read_server.script', 9005);

boltStub.run(() => {
const resolverFunction = address => {
if (address === 'neo4j.com:7687') {
return resolutionResult;
}
throw new Error(`Unexpected address ${address}`);
};

const driver = boltStub.newDriver('bolt+routing://neo4j.com', {resolver: resolverFunction});

const session = driver.session(READ);
session.run('MATCH (n) RETURN n.name')
.then(result => {
expect(result.records.map(record => record.get(0))).toEqual(['Bob', 'Alice', 'Tina']);
session.close(() => {
driver.close();

router.exit(code1 => {
reader.exit(code2 => {
expect(code1).toEqual(0);
expect(code2).toEqual(0);
done();
});
});
});
})
.catch(done.fail);
});
}

function testResolverFunctionFailureDuringFirstDiscovery(failureFunction, expectedCode, expectedMessage, done) {
if (!boltStub.supported) {
done();
return;
}

const resolverFunction = address => {
if (address === 'neo4j.com:8989') {
return failureFunction();
}
throw new Error('Unexpected address');
};

const driver = boltStub.newDriver('bolt+routing://neo4j.com:8989', {resolver: resolverFunction});
const session = driver.session();

session.run('RETURN 1')
.then(result => done.fail(result))
.catch(error => {
if (expectedCode) {
expect(error.code).toEqual(expectedCode);
}
if (expectedMessage) {
expect(error.message.indexOf(expectedMessage)).toBeGreaterThan(-1);
}
done();
});
}

class MemorizingRoutingTable extends RoutingTable {

constructor(initialTable) {
Expand Down

0 comments on commit cf14dcc

Please sign in to comment.