Skip to content

Commit

Permalink
fix: make error handling more consistent
Browse files Browse the repository at this point in the history
The run and stream methods on both the Connection and Query classes handled errors inconsistently,
sometimes throwing errors synchronously and sometimes returning rejected promises or observables.
This commit aims to make all of them consistent by always returning a promise or observable.

BREAKING CHANGE: The run and stream methods of the Connection and Query classes no longer throw
exceptions. Instead they return a rejected promise or an observable that will immediately error.
  • Loading branch information
jamesfer committed Sep 20, 2019
1 parent 8fe9218 commit 56a7591
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 63 deletions.
44 changes: 27 additions & 17 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,16 @@ export class Connection extends Builder<Query> {
* @returns {Promise<Dictionary<R>[]>}
*/
run<R = any>(query: Query): Promise<Dictionary<R>[]> {
if (!this.open) {
return AnyPromise.reject(
new Error('Cannot run query; connection is not open.'),
) as Promise<Dictionary<R>[]>;
}

if (query.getClauses().length === 0) {
throw Error('Cannot run query: no clauses attached to the query.');
return AnyPromise.reject(
new Error('Cannot run query: no clauses attached to the query.'),
) as Promise<Dictionary<R>[]>;
}

const session = this.session();
Expand All @@ -229,7 +237,7 @@ export class Connection extends Builder<Query> {
.catch((error) => {
session.close();
return Promise.reject(error);
}) as any;
}) as Promise<Dictionary<R>[]>;
}

/**
Expand Down Expand Up @@ -300,25 +308,27 @@ export class Connection extends Builder<Query> {
* In practice this should never happen unless you're doing some strange things.
*/
stream<R = any>(query: Query): Observable<Dictionary<R>> {
if (!this.open) {
throw Error('Cannot run query; connection is not open.');
}
return new Observable((subscriber: Observer<Dictionary<R>>): void => {
if (!this.open) {
subscriber.error(new Error('Cannot run query; connection is not open.'));
return;
}

if (query.getClauses().length === 0) {
throw Error('Cannot run query: no clauses attached to the query.');
}
if (query.getClauses().length === 0) {
subscriber.error(Error('Cannot run query: no clauses attached to the query.'));
return;
}

const session = this.session();
if (!session) {
throw Error('Cannot run query: connection is not open.');
}
const session = this.session();
if (!session) {
throw Error('Cannot run query: connection is not open.');
}

// Run the query
const queryObj = query.buildQueryObject();
const result = session.run(queryObj.query, queryObj.params);
// Run the query
const queryObj = query.buildQueryObject();
const result = session.run(queryObj.query, queryObj.params);

// Subscribe to the result and clean up the session
return new Observable((subscriber: Observer<Dictionary<R>>): void => {
// Subscribe to the result and clean up the session
// Note: Neo4j observables use a different subscribe syntax to RxJS observables
result.subscribe({
onNext: (record) => {
Expand Down
24 changes: 17 additions & 7 deletions src/query.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { Query } from './query';
import { expect } from '../test-setup';
// tslint:disable-next-line import-name
import Observable from 'any-observable';
import { Dictionary, each } from 'lodash';
import { node, NodePattern } from './clauses';
import { mockConnection } from '../tests/connection.mock';
import { spy, stub } from 'sinon';
import { expect } from '../test-setup';
import { mockConnection } from '../tests/connection.mock';
import { ClauseCollection } from './clause-collection';
import { node, NodePattern } from './clauses';
import { Query } from './query';

describe('Query', () => {
describe('query methods', () => {
Expand Down Expand Up @@ -96,9 +98,17 @@ describe('Query', () => {
});

describe('#stream', () => {
it('should throw if there is no attached connection object', () => {
const query = new Query();
expect(() => query.stream()).to.throw(Error, 'no connection object available');
it('should return an errored observable if there is no attached connection object', () => {
const observable = new Query().stream();
expect(observable).to.be.an.instanceOf(Observable);
observable.subscribe({
next: () => expect.fail(null, null, 'Observable should not emit anything'),
error(error) {
expect(error).to.be.instanceOf(Error);
expect(error.message).to.include('no connection object available');
},
complete: () => expect.fail(null, null, 'Observable should not complete successfully'),
});
});

it('should run the query on its connection', () => {
Expand Down
20 changes: 10 additions & 10 deletions src/query.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
// tslint:disable-next-line import-name
import AnyPromise from 'any-promise';
// tslint:disable-next-line import-name
import Observable from 'any-observable';
import { Observable as RxObservable } from 'rxjs';
import { Dictionary } from 'lodash';
import { Connection } from './connection';
import { Connection, Observer } from './connection';
import { Builder } from './builder';
import { ClauseCollection } from './clause-collection';
import { Clause, QueryObject } from './clause';
Expand Down Expand Up @@ -75,16 +77,12 @@ export class Query extends Builder<Query> {
*/
run<R = any>(): Promise<Dictionary<R>[]> {
if (!this.connection) {
return AnyPromise.reject(Error('Cannot run query; no connection object available.')) as any;
return AnyPromise.reject(
new Error('Cannot run query; no connection object available.'),
) as Promise<Dictionary<R>[]>;
}

// connection.run can throw errors synchronously. This is highly inconsistent and will be
// fixed in the future, but for now we need to catch synchronous errors and reject them.
try {
return this.connection.run<R>(this);
} catch (error) {
return AnyPromise.reject(error) as any;
}
return this.connection.run<R>(this);
}

/**
Expand Down Expand Up @@ -135,7 +133,9 @@ export class Query extends Builder<Query> {
*/
stream<R = any>(): RxObservable<Dictionary<R>> {
if (!this.connection) {
throw Error('Cannot run query; no connection object available.');
return new Observable((subscriber: Observer<Dictionary<R>>): void => {
subscriber.error(new Error('Cannot run query; no connection object available.'));
});
}

return this.connection.stream<R>(this);
Expand Down
85 changes: 56 additions & 29 deletions tests/connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import Observable from 'any-observable';
import { Dictionary, each } from 'lodash';
import { tap } from 'rxjs/operators';
import { SinonSpy, spy } from 'sinon';
import { v1 as neo4j } from 'neo4j-driver';
import { AuthToken, Config } from 'neo4j-driver/types/v1/driver';
import { Driver } from 'neo4j-driver/types/v1';
import { AuthToken, Config } from 'neo4j-driver/types/v1/driver';
import { SinonSpy, spy } from 'sinon';
import { Connection, Node, Query } from '../src';
import { NodePattern } from '../src/clauses';
import { expect } from '../test-setup';
Expand Down Expand Up @@ -108,15 +108,15 @@ describe('Connection', () => {
});

describe('#run', () => {
it('should throw if there are no clauses in the query', () => {
const run = () => connection.run(connection.query());
expect(run).to.throw(Error, 'no clauses');
it('should reject if there are no clauses in the query', () => {
const promise = connection.run(connection.query());
expect(promise).to.be.rejectedWith(Error, 'no clauses');
});

it('should throw if the connection has been closed', () => {
it('should reject if the connection has been closed', () => {
connection.close();
const run = () => connection.run(connection.query().matchNode('node'));
expect(run).to.throw(Error, 'connection is not open');
const promise = connection.run(connection.query().matchNode('node'));
expect(promise).to.be.rejectedWith(Error, 'connection is not open');
});

it('should run the query through a session', () => {
Expand Down Expand Up @@ -171,15 +171,33 @@ describe('Connection', () => {
connection.close();
});

it('should throw if there are no clauses in the query', () => {
const stream = () => connection.stream(connection.query());
expect(stream).to.throw(Error, 'no clauses');
it('should return errored observable if there are no clauses in the query', () => {
const observable = connection.stream(connection.query());
expect(observable).to.be.an.instanceOf(Observable);

observable.subscribe({
next: () => expect.fail(null, null, 'Observable should not emit anything'),
error(error) {
expect(error).to.be.instanceOf(Error);
expect(error.message).to.include('no clauses');
},
complete: () => expect.fail(null, null, 'Observable should not complete successfully'),
});
});

it('should throw if the connection has been closed', () => {
it('should return errored observable if the connection has been closed', () => {
connection.close();
const stream = () => connection.stream(query);
expect(stream).to.throw(Error, 'connection is not open');
const observable = connection.stream(query);
expect(observable).to.be.an.instanceOf(Observable);

observable.subscribe({
next: () => expect.fail(null, null, 'Observable should not emit anything'),
error(error) {
expect(error).to.be.instanceOf(Error);
expect(error.message).to.include('connection is not open');
},
complete: () => expect.fail(null, null, 'Observable should not complete successfully'),
});
});

it('should run the query through a session', () => {
Expand Down Expand Up @@ -216,7 +234,7 @@ describe('Connection', () => {
expect(observable).to.be.an.instanceOf(Observable);
observable.subscribe({
next: () => expect.fail(null, null, 'Observable should not emit any items'),
error: () => {
error() {
expect(sessionCloseSpy.calledOnce);
done();
},
Expand All @@ -227,32 +245,41 @@ describe('Connection', () => {

describe('query methods', () => {
const methods: Dictionary<Function> = {
query: () => connection.query(),
matchNode: () => connection.matchNode('Node'),
match: () => connection.match(new NodePattern('Node')),
optionalMatch: () => connection.optionalMatch(new NodePattern('Node')),
create: () => connection.create(new NodePattern('Node')),
createUnique: () => connection.createUnique(new NodePattern('Node')),
createNode: () => connection.createNode('Node'),
createUnique: () => connection.createUnique(new NodePattern('Node')),
createUniqueNode: () => connection.createUniqueNode('Node'),
return: () => connection.return('node'),
returnDistinct: () => connection.returnDistinct('node'),
delete: () => connection.delete('node'),
detachDelete: () => connection.detachDelete('node'),
limit: () => connection.limit(1),
match: () => connection.match(new NodePattern('Node')),
matchNode: () => connection.matchNode('Node'),
merge: () => connection.merge(new NodePattern('Node')),
onCreateSet: () => connection.onCreate.set({}, { merge: false }),
onCreateSetLabels: () => connection.onCreate.setLabels({}),
onCreateSetValues: () => connection.onCreate.setValues({}),
onCreateSetVariables: () => connection.onCreate.setVariables({}, false),
onMatchSet: () => connection.onMatch.set({}, { merge: false }),
onMatchSetLabels: () => connection.onMatch.setLabels({}),
onMatchSetValues: () => connection.onMatch.setValues({}),
onMatchSetVariables: () => connection.onMatch.setVariables({}, false),
optionalMatch: () => connection.optionalMatch(new NodePattern('Node')),
orderBy: () => connection.orderBy('name'),
query: () => connection.query(),
raw: () => connection.raw('name'),
remove: () => connection.remove({ properties: { node: ['prop1', 'prop2'] } }),
removeProperties: () => connection.removeProperties({ node: ['prop1', 'prop2'] }),
removeLabels: () => connection.removeLabels({ node: 'label' }),
with: () => connection.with('node'),
unwind: () => connection.unwind([1, 2, 3], 'number'),
delete: () => connection.delete('node'),
detachDelete: () => connection.detachDelete('node'),
return: () => connection.return('node'),
returnDistinct: () => connection.returnDistinct('node'),
set: () => connection.set({}, { merge: false }),
setLabels: () => connection.setLabels({}),
setValues: () => connection.setValues({}),
setVariables: () => connection.setVariables({}, false),
skip: () => connection.skip(1),
limit: () => connection.limit(1),
unwind: () => connection.unwind([1, 2, 3], 'number'),
where: () => connection.where([]),
orderBy: () => connection.orderBy('name'),
raw: () => connection.raw('name'),
with: () => connection.with('node'),
};

each(methods, (fn, name) => {
Expand Down

0 comments on commit 56a7591

Please sign in to comment.