Skip to content

Commit

Permalink
feat(Observable): add pairwise operator
Browse files Browse the repository at this point in the history
bring in pairwise operator from RxJS4
  • Loading branch information
justinwoo authored and benlesh committed Jan 4, 2016
1 parent 69ebd9a commit 1432e59
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 1 deletion.
3 changes: 2 additions & 1 deletion doc/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
- [mergeAll](function/index.html#static-function-mergeAll)
- [multicast](function/index.html#static-function-multicast)
- [observeOn](function/index.html#static-function-observeOn)
- [pairwise](function/index.html#static-function-pairwise)
- [partition](function/index.html#static-function-partition)
- [publish](function/index.html#static-function-publish)
- [publishBehavior](function/index.html#static-function-publishBehavior)
Expand Down Expand Up @@ -89,4 +90,4 @@
- [windowWhen](function/index.html#static-function-windowWhen)
- [withLatestFrom](function/index.html#static-function-withLatestFrom)
- [zip](function/index.html#static-function-zip)
- [zipAll](function/index.html#static-function-zipAll)
- [zipAll](function/index.html#static-function-zipAll)
18 changes: 18 additions & 0 deletions perf/micro/immediate-scheduler/operators/pairwise.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldPairwiseWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).pairwise();
var newPairwiseWithImmediateScheduler = RxNew.Observable.range(0, 25).pairwise();

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old pairwise with immediate scheduler', function () {
oldPairwiseWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new pairwise with immediate scheduler', function () {
newPairwiseWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
88 changes: 88 additions & 0 deletions spec/operators/pairwise-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, cold, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.pairwise()', function () {
it('should pairwise things', function () {
var e1 = hot('--a--^--b--c--d--e--f--g--|');
var e1subs = '^ !';
var expected = '------v--w--x--y--z--|';

var values = {
v: ['b', 'c'],
w: ['c', 'd'],
x: ['d', 'e'],
y: ['e', 'f'],
z: ['f', 'g']
};

var source = e1.pairwise();

expectObservable(source).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not emit on single-element streams', function () {
var e1 = hot('-----^--b----|');
var e1subs = '^ !';
var expected = '--------|';

var values = {
};

var source = e1.pairwise();

expectObservable(source).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should handle mid-stream throw', function () {
var e1 = hot('--a--^--b--c--d--e--#');
var e1subs = '^ !';
var expected = '------v--w--x--#';

var values = {
v: ['b', 'c'],
w: ['c', 'd'],
x: ['d', 'e']
};

var source = e1.pairwise();

expectObservable(source).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should handle empty', function () {
var e1 = cold('|');
var e1subs = '(^!)';
var expected = '|';

var source = e1.pairwise();

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should handle never', function () {
var e1 = cold('-');
var e1subs = '^';
var expected = '-';

var source = e1.pairwise();

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should handle throw', function () {
var e1 = cold('#');
var e1subs = '(^!)';
var expected = '#';

var source = e1.pairwise();

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
2 changes: 2 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
findIndex?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<number>;
max?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
min?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
pairwise?: <R>() => Observable<R>;
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R, concurrent?: number) => Observable<R>;
exhaust?: () => Observable<T>;
Expand Down Expand Up @@ -89,6 +90,7 @@ import './add/operator/mergeScan';
import './add/operator/min';
import './add/operator/multicast';
import './add/operator/observeOn';
import './add/operator/pairwise';
import './add/operator/partition';
import './add/operator/publish';
import './add/operator/publishBehavior';
Expand Down
7 changes: 7 additions & 0 deletions src/add/operator/pairwise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {Observable} from '../../Observable';
import {pairwise} from '../../operator/pairwise';
import {KitchenSinkOperators} from '../../Rx.KitchenSink';
const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);
observableProto.pairwise = pairwise;

export var _void: void;
38 changes: 38 additions & 0 deletions src/operator/pairwise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';

/**
* Returns a new observable that triggers on the second and following inputs.
* An input that triggers an event will return an pair of [(N - 1)th, Nth].
* The (N-1)th is stored in the internal state until Nth input occurs.
* @returns {Observable<R>} an observable of pairs of values.
*/
export function pairwise<T>(): Observable<T> {
return this.lift(new PairwiseOperator());
}

class PairwiseOperator<T, R> implements Operator<T, R> {
call(subscriber: Subscriber<T>): Subscriber<T> {
return new PairwiseSubscriber(subscriber);
}
}

class PairwiseSubscriber<T> extends Subscriber<T> {
private prev: T;
private hasPrev: boolean = false;

constructor(destination: Subscriber<T>) {
super(destination);
}

_next(value: T): void {
if (this.hasPrev) {
this.destination.next([this.prev, value]);
} else {
this.hasPrev = true;
}

this.prev = value;
}
}

0 comments on commit 1432e59

Please sign in to comment.