Skip to content

Commit

Permalink
fix(operator): fix all operators redirection of error/complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Feb 25, 2016
1 parent e76acef commit 2caa2ca
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 15 deletions.
6 changes: 3 additions & 3 deletions src/operator/DebugMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class DebugMachine<T> implements Machine<T> {
public inStream: Stream<T>) {
}

start(outStream: Observer<T>): void {
start(outStream: Stream<T>): void {
this.proxy = {
next: (t: T) => {
if (this.spy) {
Expand All @@ -19,8 +19,8 @@ export class DebugMachine<T> implements Machine<T> {
}
outStream.next(t);
},
error: outStream.error,
complete: outStream.complete,
error: (err) => outStream.error(err),
complete: () => outStream.complete(),
};
this.inStream.subscribe(this.proxy);
}
Expand Down
6 changes: 3 additions & 3 deletions src/operator/FilterMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ export class FilterMachine<T> implements Machine<T> {
public inStream: Stream<T>) {
}

start(outStream: Observer<T>): void {
start(outStream: Stream<T>): void {
this.proxy = {
next: (t: T) => {
if (this.predicate(t)) outStream.next(t);
},
error: outStream.error,
complete: outStream.complete,
error: (err) => outStream.error(err),
complete: () => outStream.complete(),
};
this.inStream.subscribe(this.proxy);
}
Expand Down
6 changes: 3 additions & 3 deletions src/operator/MapMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ export class MapMachine<T, U> implements Machine<U> {
public inStream: Stream<T>) {
}

start(outStream: Observer<U>): void {
start(outStream: Stream<U>): void {
this.proxy = {
next: (t: T) => outStream.next(this.projection(t)),
error: outStream.error,
complete: outStream.complete,
error: (err) => outStream.error(err),
complete: () => outStream.complete(),
};
this.inStream.subscribe(this.proxy);
}
Expand Down
6 changes: 3 additions & 3 deletions src/operator/SkipMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ export class SkipMachine<T> implements Machine<T> {
this.skipped = 0;
}

start(outStream: Observer<T>): void {
start(outStream: Stream<T>): void {
this.proxy = {
next: (t: T) => {
if (this.skipped++ >= this.max) outStream.next(t);
},
error: outStream.error,
complete: outStream.complete,
error: (err) => outStream.error(err),
complete: () => outStream.complete(),
};
this.inStream.subscribe(this.proxy);
}
Expand Down
6 changes: 3 additions & 3 deletions src/operator/TakeMachine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export class TakeMachine<T> implements Machine<T> {
this.taken = 0;
}

start(outStream: Observer<T>): void {
start(outStream: Stream<T>): void {
this.proxy = {
next: (t: T) => {
if (this.taken++ < this.max) {
Expand All @@ -21,8 +21,8 @@ export class TakeMachine<T> implements Machine<T> {
this.stop();
}
},
error: outStream.error,
complete: outStream.complete,
error: (err) => outStream.error(err),
complete: () => outStream.complete(),
};
this.inStream.subscribe(this.proxy);
}
Expand Down

0 comments on commit 2caa2ca

Please sign in to comment.