Skip to content

Commit

Permalink
feat: support registering observables using any-observable
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesfer committed Sep 28, 2018
1 parent feebde0 commit 57b2089
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 11 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
]
},
"dependencies": {
"any-observable": "^0.3.0",
"any-promise": "^1.3.0",
"lodash": "^4.17.5",
"neo4j-driver": "^1.5.0",
Expand Down
15 changes: 10 additions & 5 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ A flexible and intuitive query builder for Neo4j and Cypher.
Write queries in Javascript just as you would write them in Cypher.

- Easy to use fluent interface
- Support for streaming records using RxJS
- Support for streaming records using observables
- Full Typescript declarations included in package

```javascript
Expand Down Expand Up @@ -89,7 +89,7 @@ ES5

```javascript
db.matchNode('projects', 'Project')
.ret('projects')
.return('projects')
.run()
.then(function (results) {
// Do something with results
Expand All @@ -100,7 +100,7 @@ ES6

```javascript
const results = await db.matchNode('projects', 'Project')
.ret('projects')
.return('projects')
.run();
```

Expand Down Expand Up @@ -130,8 +130,7 @@ results = [
]
```

You can also use the `stream` method to download the results as an RxJS
observable.
You can also use the `stream` method to download the results as an observable.

```javascript
const results = db.matchNode('project', 'Project')
Expand All @@ -141,6 +140,12 @@ const results = db.matchNode('project', 'Project')
results.subscribe(row => console.log(row.project.properties.name));
```

Under the hood, the promises and observables used by this library are constructed
by [any-promise](https://github.com/kevinbeaty/any-promise) and
[any-observable](https://github.com/sindresorhus/any-observable) respectively. They
default to using the global Promise class and the RxJS observable library, but you
can change that by registering another implementation before importing this module.

### Processing

To extract the results, you can use ES5 array methods or a library like lodash:
Expand Down
41 changes: 35 additions & 6 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { TeardownLogic } from 'rxjs/Subscription';
import nodeCleanup = require('node-cleanup');
import { Dictionary, isFunction } from 'lodash';
import * as Promise from 'any-promise';
// import * as Observable from 'any-observable';

let connections: Connection[] = [];

Expand Down Expand Up @@ -252,17 +253,39 @@ export class Connection extends Builder<Query> {
* ```
*
* Notice how the steve record is returned for each row, this is how cypher
* works. You can extract all of steve's friends from the query by using RxJS
* works. You can extract all of steve's friends from the query by using
* operators:
* ```
* const friends$ = results$.map(row => row.friends);
* ```
*
* The observable class that is used is imported from
* [any-observable](https://github.com/sindresorhus/any-observable) by default
* it uses rxjs for the observables, but you can pick a different implementation
* by registering it with any-observable before importing this module.
*
* If you use typescript you can use the type parameter to hint at the type of
* the return value which is `Dictionary<R>`.
*
* Throws an exception if this connection is not open or there are no clauses
* in the query.
*
* The query is run when you call stream so you should subscribe to the results
* immediately to prevent missing any data.
*
* Due to the way the Neo4j javascript driver works, once you call stream there
* is no way to stop the query until it is complete. Even if you unsubscribe from
* the observable, all the remaining rows will still be parsed by the driver but
* then immediately discarded.
* ```typescript
* const results$ = connection.matchNode('records')
* .return('records')
* .limit(1000) // 1000 records will be loaded and parsed from the database
* .stream()
* .take(10) // even though you only take the first 10
* .subscribe(record => {});
* ```
* In practice this should never happen unless you're doing some strange things.
*/
stream<R = any>(query: Query): Observable<Dictionary<R>> {
if (!this.open) {
Expand All @@ -280,19 +303,25 @@ export class Connection extends Builder<Query> {
const result = session.run(queryObj.query, queryObj.params);

// Subscribe to the result and clean up the session
return Observable.create((subscriber: Observer<Dictionary<R>>): TeardownLogic => {
// Note: Neo4j observable uses a different syntax to RxJS observables
return new Observable((subscriber: Observer<Dictionary<R>>): void => {
// Note: Neo4j observables use a different subscribe syntax to RxJS observables
result.subscribe({
onNext: (record) => {
subscriber.next(this.transformer.transformRecord<R>(record));
if (!subscriber.closed) {
subscriber.next(this.transformer.transformRecord<R>(record));
}
},
onError: (error) => {
session.close();
subscriber.error(error);
if (!subscriber.closed) {
subscriber.error(error);
}
},
onCompleted: () => {
session.close();
subscriber.complete();
if (!subscriber.closed) {
subscriber.complete();
}
},
});
});
Expand Down
4 changes: 4 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ ansicolors@~0.2.1:
version "0.2.1"
resolved "https://registry.yarnpkg.com/ansicolors/-/ansicolors-0.2.1.tgz#be089599097b74a5c9c4a84a0cdbcdb62bd87aef"

any-observable@^0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/any-observable/-/any-observable-0.3.0.tgz#af933475e5806a67d0d7df090dd5e8bef65d119b"

any-promise@^1.3.0:
version "1.3.0"
resolved "https://registry.yarnpkg.com/any-promise/-/any-promise-1.3.0.tgz#abc6afeedcea52e809cdc0376aed3ce39635d17f"
Expand Down

0 comments on commit 57b2089

Please sign in to comment.