Skip to content

Commit

Permalink
fix(afs): Allow multiple subscribers by using share, closes #1191 (#1192
Browse files Browse the repository at this point in the history
)

* Allow multiple subscribers by using share
* Filter out duplicate inserts
* Added additional tests
  • Loading branch information
jamesdaniels authored Oct 5, 2017
1 parent ef3a8e3 commit 21522ab
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 6 deletions.
8 changes: 6 additions & 2 deletions src/firestore/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ export function combineChanges(current: firebase.firestore.DocumentChange[], cha
*/
export function combineChange(combined: firebase.firestore.DocumentChange[], change: firebase.firestore.DocumentChange): firebase.firestore.DocumentChange[] {
switch(change.type) {
case 'added':
combined.splice(change.newIndex, 0, change);
case 'added':
if (combined[change.newIndex] && combined[change.newIndex].doc.id == change.doc.id) {
// Not sure why the duplicates are getting fired
} else {
combined.splice(change.newIndex, 0, change);
}
break;
case 'modified':
// When an item changes position we first remove it
Expand Down
81 changes: 81 additions & 0 deletions src/firestore/collection/collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,33 @@ describe('AngularFirestoreCollection', () => {

});

it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.valueChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should handle dynamic queries that return empty sets', async (done) => {
const ITEMS = 10;
let count = 0;
Expand Down Expand Up @@ -129,6 +156,33 @@ describe('AngularFirestoreCollection', () => {
});
});

it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.snapshotChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.snapshotChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should update order on queries', async (done) => {
const ITEMS = 10;
let count = 0;
Expand Down Expand Up @@ -279,6 +333,33 @@ describe('AngularFirestoreCollection', () => {
}
});
});

it('should handle multiple subscriptions (hot)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.stateChanges();
const sub = changes.subscribe(() => {}).add(
changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
sub.unsubscribe();
})
).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});

it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.stateChanges();
changes.take(1).subscribe(() => {}).add(() => {
const sub = changes.take(1).subscribe(data => {
expect(data.length).toEqual(ITEMS);
}).add(() => {
deleteThemAll(names, ref).then(done).catch(done.fail);
});
});
});

it('should be able to filter stateChanges() types - modified', async (done) => {
const ITEMS = 10;
Expand Down
6 changes: 3 additions & 3 deletions src/firestore/collection/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export class AngularFirestoreCollection<T> {
}

/**
* Create a stream of synchronized shanges. This method keeps the local array in sorted
* Create a stream of synchronized changes. This method keeps the local array in sorted
* query order.
* @param events
*/
Expand All @@ -96,8 +96,8 @@ export class AngularFirestoreCollection<T> {
* Listen to all documents in the collection and its possible query as an Observable.
*/
valueChanges(events?: firebase.firestore.DocumentChangeType[]): Observable<T[]> {
return this.snapshotChanges()
.map(actions => actions.map(a => a.payload.doc.data()) as T[]);
return fromCollectionRef(this.query)
.map(actions => actions.payload.docs.map(a => a.data()) as T[]);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/firestore/observable/fromRef.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import { Subscription } from 'rxjs/Subscription';
import { observeOn } from 'rxjs/operator/observeOn';
import { ZoneScheduler } from 'angularfire2';
import { Action, Reference } from '../interfaces';

import 'rxjs/add/operator/map';
import 'rxjs/add/operator/share';

function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
const ref$ = new Observable(subscriber => {
Expand All @@ -16,7 +18,7 @@ function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
}

export function fromRef<R>(ref: firebase.firestore.DocumentReference | firebase.firestore.Query) {
return _fromRef<typeof ref, R>(ref);
return _fromRef<typeof ref, R>(ref).share();
}

export function fromDocRef(ref: firebase.firestore.DocumentReference): Observable<Action<firebase.firestore.DocumentSnapshot>>{
Expand Down
1 change: 1 addition & 0 deletions tools/build.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const GLOBALS = {
'rxjs/add/observable/fromPromise': 'Rx.Observable.prototype',
'rxjs/add/operator/delay': 'Rx.Observable',
'rxjs/add/operator/debounce': 'Rx.Observable',
'rxjs/add/operator/share': 'Rx.Observable',
'rxjs/observable/fromEvent': 'Rx.Observable',
'rxjs/observable/from': 'Rx.Observable',
'rxjs/operator': 'Rx.Observable.prototype',
Expand Down

0 comments on commit 21522ab

Please sign in to comment.