Skip to content

Commit

Permalink
feat(operator): implement last() operator with LastMachine
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Feb 26, 2016
1 parent ad210fc commit 747e255
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {TakeMachine} from './operator/TakeMachine';
import {SkipMachine} from './operator/SkipMachine';
import {DebugMachine} from './operator/DebugMachine';
import {FoldMachine} from './operator/FoldMachine';
import {LastMachine} from './operator/LastMachine';

export class Stream<T> implements Observer<T> {
public observers: Array<Observer<T>>;
Expand Down Expand Up @@ -73,4 +74,8 @@ export class Stream<T> implements Observer<T> {
fold<R>(accumulator: (acc: R, t: T) => R, initAcc: R): Stream<R> {
return new Stream<R>(new FoldMachine(accumulator, initAcc, this));
}

last(): Stream<T> {
return new Stream<T>(new LastMachine(this));
}
}
36 changes: 36 additions & 0 deletions src/operator/LastMachine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import {Observer} from '../Observer';
import {Machine} from '../Machine';
import {Stream} from '../Stream';
import {emptyObserver} from '../utils/emptyObserver';

export class LastMachine<T> implements Machine<T> {
public proxy: Observer<T> = emptyObserver;
public has: boolean = false;
public val: T;

constructor(public inStream: Stream<T>) {
}

start(outStream: Stream<T>): void {
this.proxy = {
next: (t: T) => {
this.has = true;
this.val = t;
},
error: (err) => outStream.error(err),
complete: () => {
if (this.has) {
outStream.next(this.val);
outStream.complete();
} else {
outStream.error('TODO show error about empty stream has no last value');
}
},
};
this.inStream.subscribe(this.proxy);
}

stop(): void {
this.inStream.unsubscribe(this.proxy);
}
}
19 changes: 19 additions & 0 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,22 @@ describe('Stream.prototype.debug', () => {
stream.subscribe(observer);
});
});

describe('Stream.prototype.last', () => {
it('should emit only the last value from a stream', (done) => {
const expected = [50];
const stream = xs.from([10, 20, 30, 40, 50]).last();
let observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: done.fail,
complete: () => {
assert.equal(expected.length, 0);
stream.unsubscribe(observer);
done();
},
};
stream.subscribe(observer);
});
});

0 comments on commit 747e255

Please sign in to comment.